diff --git a/src/main/java/com/squareup/okhttp/ConnectionPool.java b/src/main/java/com/squareup/okhttp/ConnectionPool.java index 2d8fa5978..65f03281f 100644 --- a/src/main/java/com/squareup/okhttp/ConnectionPool.java +++ b/src/main/java/com/squareup/okhttp/ConnectionPool.java @@ -113,7 +113,7 @@ public final class ConnectionPool { */ public void recycle(Connection connection) { if (connection.isSpdy()) { - throw new IllegalArgumentException(); // TODO: just 'return' here? + return; } try { diff --git a/src/main/java/com/squareup/okhttp/internal/net/http/HttpEngine.java b/src/main/java/com/squareup/okhttp/internal/net/http/HttpEngine.java index abb9fa648..01512efc7 100644 --- a/src/main/java/com/squareup/okhttp/internal/net/http/HttpEngine.java +++ b/src/main/java/com/squareup/okhttp/internal/net/http/HttpEngine.java @@ -503,7 +503,6 @@ public class HttpEngine { requestHeaders.setHost(getOriginAddress(policy.getURL())); } - // TODO: this shouldn't be set for SPDY (it's ignored) if ((connection == null || connection.getHttpMinorVersion() != 0) && requestHeaders.getConnection() == null) { requestHeaders.setConnection("Keep-Alive"); @@ -511,7 +510,6 @@ public class HttpEngine { if (requestHeaders.getAcceptEncoding() == null) { transparentGzip = true; - // TODO: this shouldn't be set for SPDY (it isn't necessary) requestHeaders.setAcceptEncoding("gzip"); } diff --git a/src/main/java/com/squareup/okhttp/internal/net/http/RawHeaders.java b/src/main/java/com/squareup/okhttp/internal/net/http/RawHeaders.java index 0fa78ed10..a367c5ef0 100644 --- a/src/main/java/com/squareup/okhttp/internal/net/http/RawHeaders.java +++ b/src/main/java/com/squareup/okhttp/internal/net/http/RawHeaders.java @@ -393,6 +393,11 @@ public final class RawHeaders { throw new IllegalArgumentException("Unexpected header: " + name + ": " + value); } + // Drop headers that are ignored when layering HTTP over SPDY. + if (name.equals("connection") || name.equals("accept-encoding")) { + continue; + } + // If we haven't seen this name before, add the pair to the end of the list... if (names.add(name)) { result.add(name); diff --git a/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyConnection.java b/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyConnection.java index 19c5083d0..b2e0a3235 100644 --- a/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyConnection.java +++ b/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyConnection.java @@ -25,6 +25,7 @@ import java.io.OutputStream; import java.net.ProtocolException; import java.net.Socket; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -90,7 +91,9 @@ public final class SpdyConnection implements Closeable { private final ExecutorService callbackExecutor; private final Map streams = new HashMap(); + private int lastGoodStreamId; private int nextStreamId; + private boolean shutdown; /** Lazily-created map of in-flight pings awaiting a response. Guarded by this. */ private Map pings; @@ -144,6 +147,9 @@ public final class SpdyConnection implements Closeable { synchronized (spdyWriter) { synchronized (this) { + if (shutdown) { + throw new IOException("shutdown"); + } streamId = nextStreamId; nextStreamId += 2; stream = new SpdyStream(streamId, this, requestHeaders, flags); @@ -235,6 +241,23 @@ public final class SpdyConnection implements Closeable { } } + /** + * Degrades this connection such that new streams can neither be created + * locally, nor accepted from the remote peer. Existing streams are not + * impacted. This is intended to permit an endpoint to gracefully stop + * accepting new requests without harming previously established streams. + */ + public void shutdown() throws IOException { + synchronized (spdyWriter) { + int lastGoodStreamId; + synchronized (this) { + shutdown = true; + lastGoodStreamId = this.lastGoodStreamId; + } + spdyWriter.goAway(0, lastGoodStreamId); + } + } + @Override public void close() throws IOException { close(null); } @@ -320,6 +343,10 @@ public final class SpdyConnection implements Closeable { nameValueBlock, flags); final SpdyStream previous; synchronized (SpdyConnection.this) { + if (shutdown) { + return; + } + lastGoodStreamId = streamId; previous = streams.put(streamId, synStream); } if (previous != null) { @@ -385,5 +412,22 @@ public final class SpdyConnection implements Closeable { } } } + + @Override public void goAway(int flags, int lastGoodStreamId) { + synchronized (SpdyConnection.this) { + shutdown = true; + + // Fail all streams created after the last good stream ID. + for (Iterator> i = streams.entrySet().iterator(); + i.hasNext();) { + Map.Entry entry = i.next(); + int streamId = entry.getKey(); + if (streamId > lastGoodStreamId && entry.getValue().isLocallyInitiated()) { + entry.getValue().receiveRstStream(SpdyStream.RST_REFUSED_STREAM); + i.remove(); + } + } + } + } } } diff --git a/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyReader.java b/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyReader.java index 4ebc7ee2c..cdb58d615 100644 --- a/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyReader.java +++ b/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyReader.java @@ -113,6 +113,9 @@ final class SpdyReader { return true; case SpdyConnection.TYPE_GOAWAY: + readGoAway(handler, flags, length); + return true; + case SpdyConnection.TYPE_HEADERS: Streams.skipByReading(in, length); throw new UnsupportedOperationException("TODO"); @@ -227,6 +230,12 @@ final class SpdyReader { handler.ping(flags, id); } + private void readGoAway(Handler handler, int flags, int length) throws IOException { + if (length != 4) throw ioException("TYPE_GOAWAY length: %d != 4", length); + int lastGoodStreamId = in.readInt() & 0x7fffffff; + handler.goAway(flags, lastGoodStreamId); + } + private void readSettings(Handler handler, int flags, int length) throws IOException { int numberOfEntries = in.readInt(); if (length != 4 + 8 * numberOfEntries) { @@ -259,7 +268,7 @@ final class SpdyReader { void settings(int flags, Settings settings); void noop(); void ping(int flags, int streamId); - // TODO: goaway + void goAway(int flags, int lastGoodStreamId); // TODO: headers } } diff --git a/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyStream.java b/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyStream.java index 6b78ca381..0bc9645f7 100644 --- a/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyStream.java +++ b/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyStream.java @@ -39,6 +39,17 @@ public final class SpdyStream { private static final int DATA_FRAME_HEADER_LENGTH = 8; + private static final String[] STATUS_CODE_NAMES = { + null, + "PROTOCOL_ERROR", + "INVALID_STREAM", + "REFUSED_STREAM", + "UNSUPPORTED_VERSION", + "CANCEL", + "INTERNAL_ERROR", + "FLOW_CONTROL_ERROR", + }; + public static final int RST_PROTOCOL_ERROR = 1; public static final int RST_INVALID_STREAM = 2; public static final int RST_REFUSED_STREAM = 3; @@ -110,7 +121,7 @@ public final class SpdyStream { if (responseHeaders != null) { return responseHeaders; } - throw new IOException("stream was reset: " + rstStatusCode); + throw new IOException("stream was reset: " + rstStatusString()); } catch (InterruptedException e) { InterruptedIOException rethrow = new InterruptedIOException(); rethrow.initCause(e); @@ -203,13 +214,13 @@ public final class SpdyStream { private boolean closeInternal(int rstStatusCode) { assert (!Thread.holdsLock(this)); synchronized (this) { - // TODO: no-op if inFinished == true and outFinished == true ? if (this.rstStatusCode != -1) { return false; } + if (in.finished && out.finished) { + return false; + } this.rstStatusCode = rstStatusCode; - in.finished = true; - out.finished = true; notifyAll(); } connection.removeStream(id); @@ -241,12 +252,16 @@ public final class SpdyStream { synchronized void receiveRstStream(int statusCode) { if (rstStatusCode == -1) { rstStatusCode = statusCode; - in.finished = true; - out.finished = true; notifyAll(); } } + private String rstStatusString() { + return rstStatusCode > 0 && rstStatusCode < STATUS_CODE_NAMES.length + ? STATUS_CODE_NAMES[rstStatusCode] + : Integer.toString(rstStatusCode); + } + /** * An input stream that reads the incoming data frames of a stream. Although * this class uses synchronization to safely receive incoming data frames, @@ -279,8 +294,8 @@ public final class SpdyStream { private boolean closed; /** - * True if either side has shut down this stream. We will receive no - * more bytes beyond those already in the buffer. + * True if either side has cleanly shut down this stream. We will + * receive no more bytes beyond those already in the buffer. */ private boolean finished; @@ -303,16 +318,15 @@ public final class SpdyStream { @Override public int read(byte[] b, int offset, int count) throws IOException { synchronized (SpdyStream.this) { - checkNotClosed(); checkOffsetAndCount(b.length, offset, count); - - while (pos == -1 && !finished) { + while (pos == -1 && !finished && !closed && rstStatusCode == -1) { try { SpdyStream.this.wait(); } catch (InterruptedException e) { throw new InterruptedIOException(); } } + checkNotClosed(); if (pos == -1) { return -1; @@ -407,6 +421,7 @@ public final class SpdyStream { @Override public void close() throws IOException { synchronized (SpdyStream.this) { closed = true; + SpdyStream.this.notifyAll(); } cancelStreamIfNecessary(); } @@ -415,22 +430,22 @@ public final class SpdyStream { if (closed) { throw new IOException("stream closed"); } + if (rstStatusCode != -1) { + throw new IOException("stream was reset: " + rstStatusString()); + } } } private void cancelStreamIfNecessary() throws IOException { assert (!Thread.holdsLock(SpdyStream.this)); synchronized (this) { - if (in.closed && !in.finished && (out.finished || out.closed)) { - // RST this stream to prevent additional data from being sent. This is safe because - // the input stream is closed (we won't use any further bytes) and the output stream - // is either finished or closed (so RSTing both streams won't cause harm). - in.finished = true; - } else { - // We shouldn't cancel this stream. - return; + if (!in.closed || in.finished || (!out.finished && !out.closed)) { + return; // We shouldn't cancel this stream (or don't need to). } } + // RST this stream to prevent additional data from being sent. This is safe because + // the input stream is closed (we won't use any further bytes) and the output stream + // is either finished or closed (so RSTing both streams doesn't cause harm). SpdyStream.this.close(RST_CANCEL); } @@ -446,8 +461,8 @@ public final class SpdyStream { private boolean closed; /** - * True if either side has shut down this stream. We shall send no more - * bytes. + * True if either side has cleanly shut down this stream. We shall send + * no more bytes. */ private boolean finished; @@ -511,10 +526,10 @@ public final class SpdyStream { synchronized (SpdyStream.this) { if (closed) { throw new IOException("stream closed"); - } - if (finished) { - throw new IOException("output stream finished " - + "(RST status code=" + rstStatusCode + ")"); + } else if (finished) { + throw new IOException("stream finished"); + } else if (rstStatusCode != -1) { + throw new IOException("stream was reset: " + rstStatusString()); } } } diff --git a/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyWriter.java b/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyWriter.java index 98b588ff2..2ef52e19e 100644 --- a/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyWriter.java +++ b/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyWriter.java @@ -140,4 +140,13 @@ final class SpdyWriter { out.writeInt(id); out.flush(); } + + public synchronized void goAway(int flags, int lastGoodStreamId) throws IOException { + int type = SpdyConnection.TYPE_GOAWAY; + int length = 4; + out.writeInt(0x80000000 | (SpdyConnection.VERSION & 0x7fff) << 16 | type & 0xffff); + out.writeInt((flags & 0xff) << 24 | length & 0xffffff); + out.writeInt(lastGoodStreamId); + out.flush(); + } } diff --git a/src/test/java/com/squareup/okhttp/internal/net/spdy/MockSpdyPeer.java b/src/test/java/com/squareup/okhttp/internal/net/spdy/MockSpdyPeer.java index 1f595d986..abe8a7427 100644 --- a/src/test/java/com/squareup/okhttp/internal/net/spdy/MockSpdyPeer.java +++ b/src/test/java/com/squareup/okhttp/internal/net/spdy/MockSpdyPeer.java @@ -81,6 +81,7 @@ public final class MockSpdyPeer { Socket socket = serverSocket.accept(); OutputStream out = socket.getOutputStream(); InputStream in = socket.getInputStream(); + SpdyReader reader = new SpdyReader(in); Iterator outFramesIterator = outFrames.iterator(); byte[] outBytes = bytesOut.toByteArray(); @@ -106,7 +107,6 @@ public final class MockSpdyPeer { } else { // read a frame - SpdyReader reader = new SpdyReader(in); InFrame inFrame = new InFrame(i, reader); reader.nextFrame(inFrame); inFrames.add(inFrame); @@ -201,5 +201,12 @@ public final class MockSpdyPeer { if (this.type != -1) throw new IllegalStateException(); this.type = SpdyConnection.TYPE_NOOP; } + + @Override public void goAway(int flags, int lastGoodStreamId) { + if (this.type != -1) throw new IllegalStateException(); + this.type = SpdyConnection.TYPE_GOAWAY; + this.flags = flags; + this.streamId = lastGoodStreamId; + } } } \ No newline at end of file diff --git a/src/test/java/com/squareup/okhttp/internal/net/spdy/SpdyConnectionTest.java b/src/test/java/com/squareup/okhttp/internal/net/spdy/SpdyConnectionTest.java index bbb47f43f..333b1c7ad 100644 --- a/src/test/java/com/squareup/okhttp/internal/net/spdy/SpdyConnectionTest.java +++ b/src/test/java/com/squareup/okhttp/internal/net/spdy/SpdyConnectionTest.java @@ -20,6 +20,7 @@ import static com.squareup.okhttp.internal.Util.UTF_8; import static com.squareup.okhttp.internal.net.spdy.Settings.PERSIST_VALUE; import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.FLAG_FIN; import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.TYPE_DATA; +import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.TYPE_GOAWAY; import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.TYPE_NOOP; import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.TYPE_PING; import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.TYPE_RST_STREAM; @@ -331,6 +332,7 @@ public final class SpdyConnectionTest { out.write("round".getBytes(UTF_8)); fail(); } catch (Exception expected) { + assertEquals("stream closed", expected.getMessage()); } // verify the peer received what was expected @@ -368,6 +370,7 @@ public final class SpdyConnectionTest { out.write("square".getBytes(UTF_8)); fail(); } catch (IOException expected) { + assertEquals("stream was reset: CANCEL", expected.getMessage()); } out.close(); @@ -402,11 +405,13 @@ public final class SpdyConnectionTest { in.read(); fail(); } catch (IOException expected) { + assertEquals("stream closed", expected.getMessage()); } try { out.write('a'); fail(); } catch (IOException expected) { + assertEquals("stream finished", expected.getMessage()); } // verify the peer received what was expected @@ -443,6 +448,7 @@ public final class SpdyConnectionTest { in.read(); fail(); } catch (IOException expected) { + assertEquals("stream closed", expected.getMessage()); } out.write("square".getBytes(UTF_8)); out.flush(); @@ -490,19 +496,29 @@ public final class SpdyConnectionTest { // write the mocking script peer.acceptFrame(); peer.sendFrame().synReply(0, 1, Arrays.asList("a", "android")); + peer.acceptFrame(); // PING peer.sendFrame().synReply(0, 1, Arrays.asList("b", "banana")); - peer.acceptFrame(); + peer.sendFrame().ping(0, 1); + peer.acceptFrame(); // RST STREAM peer.play(); // play it back SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); SpdyStream stream = connection.newStream(Arrays.asList("c", "cola"), true, true); assertEquals(Arrays.asList("a", "android"), stream.getResponseHeaders()); - assertStreamData("", stream.getInputStream()); + connection.ping().roundTripTime(); // Ensure that the 2nd SYN REPLY has been received. + try { + stream.getInputStream().read(); + fail(); + } catch (IOException e) { + assertEquals("stream was reset: PROTOCOL_ERROR", e.getMessage()); + } // verify the peer received what was expected MockSpdyPeer.InFrame synStream = peer.takeFrame(); assertEquals(TYPE_SYN_STREAM, synStream.type); + MockSpdyPeer.InFrame ping = peer.takeFrame(); + assertEquals(TYPE_PING, ping.type); MockSpdyPeer.InFrame rstStream = peer.takeFrame(); assertEquals(TYPE_RST_STREAM, rstStream.type); assertEquals(1, rstStream.streamId); @@ -611,6 +627,7 @@ public final class SpdyConnectionTest { stream.getResponseHeaders(); fail(); } catch (IOException expected) { + assertEquals("stream was reset: REFUSED_STREAM", expected.getMessage()); } // verify the peer received what was expected @@ -622,6 +639,75 @@ public final class SpdyConnectionTest { assertEquals(0, ping.flags); } + @Test public void receiveGoAway() throws Exception { + // write the mocking script + peer.acceptFrame(); // SYN STREAM 1 + peer.acceptFrame(); // SYN STREAM 3 + peer.sendFrame().goAway(0, 1); + peer.acceptFrame(); // PING + peer.sendFrame().ping(0, 1); + peer.acceptFrame(); // DATA STREAM 1 + peer.play(); + + // play it back + SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + SpdyStream stream1 = connection.newStream(Arrays.asList("a", "android"), true, true); + SpdyStream stream2 = connection.newStream(Arrays.asList("b", "banana"), true, true); + connection.ping().roundTripTime(); // Ensure that the GO_AWAY has been received. + stream1.getOutputStream().write("abc".getBytes(UTF_8)); + try { + stream2.getOutputStream().write("abc".getBytes(UTF_8)); + fail(); + } catch (IOException expected) { + assertEquals("stream was reset: REFUSED_STREAM", expected.getMessage()); + } + stream1.getOutputStream().write("def".getBytes(UTF_8)); + stream1.getOutputStream().close(); + try { + connection.newStream(Arrays.asList("c", "cola"), true, true); + fail(); + } catch (IOException expected) { + assertEquals("shutdown", expected.getMessage()); + } + + // verify the peer received what was expected + MockSpdyPeer.InFrame synStream1 = peer.takeFrame(); + assertEquals(TYPE_SYN_STREAM, synStream1.type); + MockSpdyPeer.InFrame synStream2 = peer.takeFrame(); + assertEquals(TYPE_SYN_STREAM, synStream2.type); + MockSpdyPeer.InFrame ping = peer.takeFrame(); + assertEquals(TYPE_PING, ping.type); + MockSpdyPeer.InFrame data1 = peer.takeFrame(); + assertEquals(TYPE_DATA, data1.type); + assertEquals(1, data1.streamId); + assertTrue(Arrays.equals("abcdef".getBytes("UTF-8"), data1.data)); + } + + @Test public void sendGoAway() throws Exception { + // write the mocking script + peer.acceptFrame(); // SYN STREAM 1 + peer.acceptFrame(); // GOAWAY + peer.sendFrame().synStream(0, 2, 0, 0, Arrays.asList("b", "banana")); // Should be ignored! + peer.acceptFrame(); // PING + peer.sendFrame().ping(0, 1); + peer.play(); + + // play it back + SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + connection.newStream(Arrays.asList("a", "android"), true, true); + connection.shutdown(); + connection.ping().roundTripTime(); // Ensure that the SYN STREAM has been received. + + // verify the peer received what was expected + MockSpdyPeer.InFrame synStream1 = peer.takeFrame(); + assertEquals(TYPE_SYN_STREAM, synStream1.type); + MockSpdyPeer.InFrame goaway = peer.takeFrame(); + assertEquals(TYPE_GOAWAY, goaway.type); + assertEquals(0, goaway.streamId); + MockSpdyPeer.InFrame ping = peer.takeFrame(); + assertEquals(TYPE_PING, ping.type); + } + private void writeAndClose(SpdyStream stream, String data) throws IOException { OutputStream out = stream.getOutputStream(); out.write(data.getBytes("UTF-8"));