From b5719b6d7faff6422e33897414c0afdf9e63fe95 Mon Sep 17 00:00:00 2001 From: jwilson Date: Mon, 31 Dec 2012 20:41:15 -0700 Subject: [PATCH] Implement SpdyConnection.close(). --- .../squareup/okhttp/internal/io/IoUtils.java | 23 +++++ .../okhttp/internal/net/spdy/Ping.java | 14 +++- .../internal/net/spdy/SpdyConnection.java | 63 ++++++++++---- .../okhttp/internal/net/spdy/SpdyReader.java | 13 ++- .../okhttp/internal/net/spdy/SpdyWriter.java | 8 +- .../internal/net/spdy/SpdyConnectionTest.java | 84 ++++++++++++++++++- 6 files changed, 176 insertions(+), 29 deletions(-) diff --git a/src/main/java/com/squareup/okhttp/internal/io/IoUtils.java b/src/main/java/com/squareup/okhttp/internal/io/IoUtils.java index 2b7ccb127..d8fa785a0 100644 --- a/src/main/java/com/squareup/okhttp/internal/io/IoUtils.java +++ b/src/main/java/com/squareup/okhttp/internal/io/IoUtils.java @@ -39,6 +39,29 @@ public final class IoUtils { } } + /** + * Closes {@code a} and {@code b}. If either close fails, this completes + * the other close and rethrows the first encountered exception. + */ + public static void closeAll(Closeable a, Closeable b) throws IOException { + Throwable thrown = null; + try { + a.close(); + } catch (Throwable e) { + thrown = e; + } + try { + b.close(); + } catch (Throwable e) { + if (thrown == null) thrown = e; + } + if (thrown == null) return; + if (thrown instanceof IOException) throw (IOException) thrown; + if (thrown instanceof RuntimeException) throw (RuntimeException) thrown; + if (thrown instanceof Error) throw (Error) thrown; + throw new AssertionError(thrown); + } + /** * Closes 'socket', ignoring any exceptions. Does nothing if 'socket' is null. */ diff --git a/src/main/java/com/squareup/okhttp/internal/net/spdy/Ping.java b/src/main/java/com/squareup/okhttp/internal/net/spdy/Ping.java index 5912831d4..dc712809e 100644 --- a/src/main/java/com/squareup/okhttp/internal/net/spdy/Ping.java +++ b/src/main/java/com/squareup/okhttp/internal/net/spdy/Ping.java @@ -40,9 +40,16 @@ public final class Ping { latch.countDown(); } + void cancel() { + if (received != -1 || sent == -1) throw new IllegalStateException(); + received = sent - 1; + latch.countDown(); + } + /** * Returns the round trip time for this ping in nanoseconds, waiting for the - * response to arrive if necessary. + * response to arrive if necessary. Returns -1 if the response was + * cancelled. */ public long roundTripTime() throws InterruptedException { latch.await(); @@ -51,13 +58,14 @@ public final class Ping { /** * Returns the round trip time for this ping in nanoseconds, or -1 if the - * timeout elapsed before the round trip completed. + * response was cancelled, or -2 if the timeout elapsed before the round + * trip completed. */ public long roundTripTime(long timeout, TimeUnit unit) throws InterruptedException { if (latch.await(timeout, unit)) { return received - sent; } else { - return -1; + return -2; } } } diff --git a/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyConnection.java b/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyConnection.java index 568c93a7c..27e7c30a3 100644 --- a/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyConnection.java +++ b/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyConnection.java @@ -16,6 +16,7 @@ package com.squareup.okhttp.internal.net.spdy; +import com.squareup.okhttp.internal.io.IoUtils; import com.squareup.okhttp.internal.io.Streams; import static com.squareup.okhttp.internal.net.spdy.Threads.newThreadFactory; import java.io.Closeable; @@ -206,6 +207,9 @@ public final class SpdyConnection implements Closeable { Ping ping = new Ping(); int pingId; synchronized (this) { + if (shutdown) { + throw new IOException("shutdown"); + } pingId = nextPingId; nextPingId += 2; if (pings == null) pings = new HashMap(); @@ -261,6 +265,9 @@ public final class SpdyConnection implements Closeable { synchronized (spdyWriter) { int lastGoodStreamId; synchronized (this) { + if (shutdown) { + return; + } shutdown = true; lastGoodStreamId = this.lastGoodStreamId; } @@ -268,20 +275,46 @@ public final class SpdyConnection implements Closeable { } } + /** + * Closes this connection. This cancels all open streams and unanswered + * pings. It closes the underlying input and output streams and shuts down + * internal executor services. + */ @Override public void close() throws IOException { - close(null); - } + shutdown(); - private synchronized void close(Throwable reason) throws IOException { - if (reason != null) { - reason.printStackTrace(); + SpdyStream[] streamsToClose = null; + Ping[] pingsToCancel = null; + synchronized (this) { + if (!streams.isEmpty()) { + streamsToClose = streams.values().toArray(new SpdyStream[streams.size()]); + streams.clear(); + } + if (pings != null) { + pingsToCancel = pings.values().toArray(new Ping[pings.size()]); + pings = null; + } } - // TODO: forward 'reason' to forced closed streams? - // TODO: graceful close; send RST frames - // TODO: close all streams to release waiting readers + + if (streamsToClose != null) { + for (SpdyStream stream : streamsToClose) { + try { + stream.close(SpdyStream.RST_CANCEL); + } catch (Throwable ignored) { + } + } + } + + if (pingsToCancel != null) { + for (Ping ping : pingsToCancel) { + ping.cancel(); + } + } + writeExecutor.shutdown(); - readExecutor.shutdown(); callbackExecutor.shutdown(); + readExecutor.shutdown(); + IoUtils.closeAll(spdyReader, spdyWriter); } public static class Builder { @@ -320,17 +353,13 @@ public final class SpdyConnection implements Closeable { private class Reader implements Runnable, SpdyReader.Handler { @Override public void run() { - Throwable failure = null; try { while (spdyReader.nextFrame(this)) { } - } catch (Throwable e) { - failure = e; - } - - try { - close(failure); - } catch (IOException ignored) { + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + IoUtils.closeQuietly(SpdyConnection.this); } } diff --git a/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyReader.java b/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyReader.java index cdb58d615..d49d4848e 100644 --- a/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyReader.java +++ b/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyReader.java @@ -16,9 +16,10 @@ package com.squareup.okhttp.internal.net.spdy; +import com.squareup.okhttp.internal.io.IoUtils; import com.squareup.okhttp.internal.io.Streams; +import java.io.Closeable; import java.io.DataInputStream; -import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.UnsupportedEncodingException; @@ -32,7 +33,7 @@ import java.util.zip.InflaterInputStream; /** * Read version 2 SPDY frames. */ -final class SpdyReader { +final class SpdyReader implements Closeable { private static final String DICTIONARY_STRING = "" + "optionsgetheadpostputdeletetraceacceptaccept-charsetaccept-encodingaccept-" + "languageauthorizationexpectfromhostif-modified-sinceif-matchif-none-matchi" @@ -73,8 +74,8 @@ final class SpdyReader { int w1; try { w1 = in.readInt(); - } catch (EOFException e) { - return false; + } catch (IOException e) { + return false; // This might be a normal socket close. } int w2 = in.readInt(); @@ -259,6 +260,10 @@ final class SpdyReader { throw new IOException(String.format(message, args)); } + @Override public void close() throws IOException { + IoUtils.closeAll(in, nameValueBlockIn); + } + public interface Handler { void data(int flags, int streamId, InputStream in, int length) throws IOException; void synStream(int flags, int streamId, int associatedStreamId, int priority, diff --git a/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyWriter.java b/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyWriter.java index 2ef52e19e..28514a2dc 100644 --- a/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyWriter.java +++ b/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyWriter.java @@ -17,7 +17,9 @@ package com.squareup.okhttp.internal.net.spdy; import com.squareup.okhttp.internal.Platform; +import com.squareup.okhttp.internal.io.IoUtils; import java.io.ByteArrayOutputStream; +import java.io.Closeable; import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -27,7 +29,7 @@ import java.util.zip.Deflater; /** * Write version 2 SPDY frames. */ -final class SpdyWriter { +final class SpdyWriter implements Closeable { final DataOutputStream out; private final ByteArrayOutputStream nameValueBlockBuffer; private final DataOutputStream nameValueBlockOut; @@ -149,4 +151,8 @@ final class SpdyWriter { out.writeInt(lastGoodStreamId); out.flush(); } + + @Override public void close() throws IOException { + IoUtils.closeAll(out, nameValueBlockOut); + } } diff --git a/src/test/java/com/squareup/okhttp/internal/net/spdy/SpdyConnectionTest.java b/src/test/java/com/squareup/okhttp/internal/net/spdy/SpdyConnectionTest.java index c8917b171..7877b2568 100644 --- a/src/test/java/com/squareup/okhttp/internal/net/spdy/SpdyConnectionTest.java +++ b/src/test/java/com/squareup/okhttp/internal/net/spdy/SpdyConnectionTest.java @@ -725,26 +725,102 @@ public final class SpdyConnectionTest { // write the mocking script peer.acceptFrame(); // SYN STREAM 1 peer.acceptFrame(); // GOAWAY - peer.sendFrame().synStream(0, 2, 0, 0, Arrays.asList("b", "banana")); // Should be ignored! peer.acceptFrame(); // PING + peer.sendFrame().synStream(0, 2, 0, 0, Arrays.asList("b", "banana")); // Should be ignored! peer.sendFrame().ping(0, 1); peer.play(); // play it back SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); connection.newStream(Arrays.asList("a", "android"), true, true); + Ping ping = connection.ping(); connection.shutdown(); - connection.ping().roundTripTime(); // Ensure that the SYN STREAM has been received. + ping.roundTripTime(); // Ensure that the SYN STREAM has been received. assertEquals(1, connection.openStreamCount()); // verify the peer received what was expected MockSpdyPeer.InFrame synStream1 = peer.takeFrame(); assertEquals(TYPE_SYN_STREAM, synStream1.type); + MockSpdyPeer.InFrame pingFrame = peer.takeFrame(); + assertEquals(TYPE_PING, pingFrame.type); MockSpdyPeer.InFrame goaway = peer.takeFrame(); assertEquals(TYPE_GOAWAY, goaway.type); assertEquals(0, goaway.streamId); - MockSpdyPeer.InFrame ping = peer.takeFrame(); - assertEquals(TYPE_PING, ping.type); + } + + @Test public void noPingsAfterShutdown() throws Exception { + // write the mocking script + peer.acceptFrame(); // GOAWAY + peer.play(); + + // play it back + SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + connection.shutdown(); + try { + connection.ping(); + fail(); + } catch (IOException expected) { + assertEquals("shutdown", expected.getMessage()); + } + + // verify the peer received what was expected + MockSpdyPeer.InFrame goaway = peer.takeFrame(); + assertEquals(TYPE_GOAWAY, goaway.type); + } + + @Test public void close() throws Exception { + // write the mocking script + peer.acceptFrame(); // SYN STREAM + peer.acceptFrame(); // GOAWAY + peer.acceptFrame(); // RST STREAM + peer.play(); + + // play it back + SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + SpdyStream stream = connection.newStream(Arrays.asList("a", "android"), true, true); + assertEquals(1, connection.openStreamCount()); + connection.close(); + assertEquals(0, connection.openStreamCount()); + try { + connection.newStream(Arrays.asList("b", "banana"), true, true); + fail(); + } catch (IOException expected) { + assertEquals("shutdown", expected.getMessage()); + } + try { + stream.getOutputStream().write(0); + fail(); + } catch (IOException expected) { + assertEquals("stream was reset: CANCEL", expected.getMessage()); + } + try { + stream.getInputStream().read(); + fail(); + } catch (IOException expected) { + assertEquals("stream was reset: CANCEL", expected.getMessage()); + } + + // verify the peer received what was expected + MockSpdyPeer.InFrame synStream = peer.takeFrame(); + assertEquals(TYPE_SYN_STREAM, synStream.type); + MockSpdyPeer.InFrame goaway = peer.takeFrame(); + assertEquals(TYPE_GOAWAY, goaway.type); + MockSpdyPeer.InFrame rstStream = peer.takeFrame(); + assertEquals(TYPE_RST_STREAM, rstStream.type); + assertEquals(1, rstStream.streamId); + } + + @Test public void closeCancelsPings() throws Exception { + // write the mocking script + peer.acceptFrame(); // PING + peer.acceptFrame(); // GOAWAY + peer.play(); + + // play it back + SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + Ping ping = connection.ping(); + connection.close(); + assertEquals(-1, ping.roundTripTime()); } private void writeAndClose(SpdyStream stream, String data) throws IOException {