1
0
mirror of https://github.com/square/okhttp.git synced 2026-01-27 04:22:07 +03:00

Merge pull request #74 from square/jwilson/close

Implement SpdyConnection.close().
This commit is contained in:
Jake Wharton
2012-12-31 20:28:18 -08:00
6 changed files with 176 additions and 29 deletions

View File

@@ -39,6 +39,29 @@ public final class IoUtils {
}
}
/**
* Closes {@code a} and {@code b}. If either close fails, this completes
* the other close and rethrows the first encountered exception.
*/
public static void closeAll(Closeable a, Closeable b) throws IOException {
Throwable thrown = null;
try {
a.close();
} catch (Throwable e) {
thrown = e;
}
try {
b.close();
} catch (Throwable e) {
if (thrown == null) thrown = e;
}
if (thrown == null) return;
if (thrown instanceof IOException) throw (IOException) thrown;
if (thrown instanceof RuntimeException) throw (RuntimeException) thrown;
if (thrown instanceof Error) throw (Error) thrown;
throw new AssertionError(thrown);
}
/**
* Closes 'socket', ignoring any exceptions. Does nothing if 'socket' is null.
*/

View File

@@ -40,9 +40,16 @@ public final class Ping {
latch.countDown();
}
void cancel() {
if (received != -1 || sent == -1) throw new IllegalStateException();
received = sent - 1;
latch.countDown();
}
/**
* Returns the round trip time for this ping in nanoseconds, waiting for the
* response to arrive if necessary.
* response to arrive if necessary. Returns -1 if the response was
* cancelled.
*/
public long roundTripTime() throws InterruptedException {
latch.await();
@@ -51,13 +58,14 @@ public final class Ping {
/**
* Returns the round trip time for this ping in nanoseconds, or -1 if the
* timeout elapsed before the round trip completed.
* response was cancelled, or -2 if the timeout elapsed before the round
* trip completed.
*/
public long roundTripTime(long timeout, TimeUnit unit) throws InterruptedException {
if (latch.await(timeout, unit)) {
return received - sent;
} else {
return -1;
return -2;
}
}
}

View File

@@ -16,6 +16,7 @@
package com.squareup.okhttp.internal.net.spdy;
import com.squareup.okhttp.internal.io.IoUtils;
import com.squareup.okhttp.internal.io.Streams;
import static com.squareup.okhttp.internal.net.spdy.Threads.newThreadFactory;
import java.io.Closeable;
@@ -206,6 +207,9 @@ public final class SpdyConnection implements Closeable {
Ping ping = new Ping();
int pingId;
synchronized (this) {
if (shutdown) {
throw new IOException("shutdown");
}
pingId = nextPingId;
nextPingId += 2;
if (pings == null) pings = new HashMap<Integer, Ping>();
@@ -261,6 +265,9 @@ public final class SpdyConnection implements Closeable {
synchronized (spdyWriter) {
int lastGoodStreamId;
synchronized (this) {
if (shutdown) {
return;
}
shutdown = true;
lastGoodStreamId = this.lastGoodStreamId;
}
@@ -268,20 +275,46 @@ public final class SpdyConnection implements Closeable {
}
}
/**
* Closes this connection. This cancels all open streams and unanswered
* pings. It closes the underlying input and output streams and shuts down
* internal executor services.
*/
@Override public void close() throws IOException {
close(null);
}
shutdown();
private synchronized void close(Throwable reason) throws IOException {
if (reason != null) {
reason.printStackTrace();
SpdyStream[] streamsToClose = null;
Ping[] pingsToCancel = null;
synchronized (this) {
if (!streams.isEmpty()) {
streamsToClose = streams.values().toArray(new SpdyStream[streams.size()]);
streams.clear();
}
if (pings != null) {
pingsToCancel = pings.values().toArray(new Ping[pings.size()]);
pings = null;
}
}
// TODO: forward 'reason' to forced closed streams?
// TODO: graceful close; send RST frames
// TODO: close all streams to release waiting readers
if (streamsToClose != null) {
for (SpdyStream stream : streamsToClose) {
try {
stream.close(SpdyStream.RST_CANCEL);
} catch (Throwable ignored) {
}
}
}
if (pingsToCancel != null) {
for (Ping ping : pingsToCancel) {
ping.cancel();
}
}
writeExecutor.shutdown();
readExecutor.shutdown();
callbackExecutor.shutdown();
readExecutor.shutdown();
IoUtils.closeAll(spdyReader, spdyWriter);
}
public static class Builder {
@@ -320,17 +353,13 @@ public final class SpdyConnection implements Closeable {
private class Reader implements Runnable, SpdyReader.Handler {
@Override public void run() {
Throwable failure = null;
try {
while (spdyReader.nextFrame(this)) {
}
} catch (Throwable e) {
failure = e;
}
try {
close(failure);
} catch (IOException ignored) {
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
IoUtils.closeQuietly(SpdyConnection.this);
}
}

View File

@@ -16,9 +16,10 @@
package com.squareup.okhttp.internal.net.spdy;
import com.squareup.okhttp.internal.io.IoUtils;
import com.squareup.okhttp.internal.io.Streams;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
@@ -32,7 +33,7 @@ import java.util.zip.InflaterInputStream;
/**
* Read version 2 SPDY frames.
*/
final class SpdyReader {
final class SpdyReader implements Closeable {
private static final String DICTIONARY_STRING = ""
+ "optionsgetheadpostputdeletetraceacceptaccept-charsetaccept-encodingaccept-"
+ "languageauthorizationexpectfromhostif-modified-sinceif-matchif-none-matchi"
@@ -73,8 +74,8 @@ final class SpdyReader {
int w1;
try {
w1 = in.readInt();
} catch (EOFException e) {
return false;
} catch (IOException e) {
return false; // This might be a normal socket close.
}
int w2 = in.readInt();
@@ -259,6 +260,10 @@ final class SpdyReader {
throw new IOException(String.format(message, args));
}
@Override public void close() throws IOException {
IoUtils.closeAll(in, nameValueBlockIn);
}
public interface Handler {
void data(int flags, int streamId, InputStream in, int length) throws IOException;
void synStream(int flags, int streamId, int associatedStreamId, int priority,

View File

@@ -17,7 +17,9 @@
package com.squareup.okhttp.internal.net.spdy;
import com.squareup.okhttp.internal.Platform;
import com.squareup.okhttp.internal.io.IoUtils;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
@@ -27,7 +29,7 @@ import java.util.zip.Deflater;
/**
* Write version 2 SPDY frames.
*/
final class SpdyWriter {
final class SpdyWriter implements Closeable {
final DataOutputStream out;
private final ByteArrayOutputStream nameValueBlockBuffer;
private final DataOutputStream nameValueBlockOut;
@@ -149,4 +151,8 @@ final class SpdyWriter {
out.writeInt(lastGoodStreamId);
out.flush();
}
@Override public void close() throws IOException {
IoUtils.closeAll(out, nameValueBlockOut);
}
}

View File

@@ -725,26 +725,102 @@ public final class SpdyConnectionTest {
// write the mocking script
peer.acceptFrame(); // SYN STREAM 1
peer.acceptFrame(); // GOAWAY
peer.sendFrame().synStream(0, 2, 0, 0, Arrays.asList("b", "banana")); // Should be ignored!
peer.acceptFrame(); // PING
peer.sendFrame().synStream(0, 2, 0, 0, Arrays.asList("b", "banana")); // Should be ignored!
peer.sendFrame().ping(0, 1);
peer.play();
// play it back
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
connection.newStream(Arrays.asList("a", "android"), true, true);
Ping ping = connection.ping();
connection.shutdown();
connection.ping().roundTripTime(); // Ensure that the SYN STREAM has been received.
ping.roundTripTime(); // Ensure that the SYN STREAM has been received.
assertEquals(1, connection.openStreamCount());
// verify the peer received what was expected
MockSpdyPeer.InFrame synStream1 = peer.takeFrame();
assertEquals(TYPE_SYN_STREAM, synStream1.type);
MockSpdyPeer.InFrame pingFrame = peer.takeFrame();
assertEquals(TYPE_PING, pingFrame.type);
MockSpdyPeer.InFrame goaway = peer.takeFrame();
assertEquals(TYPE_GOAWAY, goaway.type);
assertEquals(0, goaway.streamId);
MockSpdyPeer.InFrame ping = peer.takeFrame();
assertEquals(TYPE_PING, ping.type);
}
@Test public void noPingsAfterShutdown() throws Exception {
// write the mocking script
peer.acceptFrame(); // GOAWAY
peer.play();
// play it back
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
connection.shutdown();
try {
connection.ping();
fail();
} catch (IOException expected) {
assertEquals("shutdown", expected.getMessage());
}
// verify the peer received what was expected
MockSpdyPeer.InFrame goaway = peer.takeFrame();
assertEquals(TYPE_GOAWAY, goaway.type);
}
@Test public void close() throws Exception {
// write the mocking script
peer.acceptFrame(); // SYN STREAM
peer.acceptFrame(); // GOAWAY
peer.acceptFrame(); // RST STREAM
peer.play();
// play it back
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
SpdyStream stream = connection.newStream(Arrays.asList("a", "android"), true, true);
assertEquals(1, connection.openStreamCount());
connection.close();
assertEquals(0, connection.openStreamCount());
try {
connection.newStream(Arrays.asList("b", "banana"), true, true);
fail();
} catch (IOException expected) {
assertEquals("shutdown", expected.getMessage());
}
try {
stream.getOutputStream().write(0);
fail();
} catch (IOException expected) {
assertEquals("stream was reset: CANCEL", expected.getMessage());
}
try {
stream.getInputStream().read();
fail();
} catch (IOException expected) {
assertEquals("stream was reset: CANCEL", expected.getMessage());
}
// verify the peer received what was expected
MockSpdyPeer.InFrame synStream = peer.takeFrame();
assertEquals(TYPE_SYN_STREAM, synStream.type);
MockSpdyPeer.InFrame goaway = peer.takeFrame();
assertEquals(TYPE_GOAWAY, goaway.type);
MockSpdyPeer.InFrame rstStream = peer.takeFrame();
assertEquals(TYPE_RST_STREAM, rstStream.type);
assertEquals(1, rstStream.streamId);
}
@Test public void closeCancelsPings() throws Exception {
// write the mocking script
peer.acceptFrame(); // PING
peer.acceptFrame(); // GOAWAY
peer.play();
// play it back
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
Ping ping = connection.ping();
connection.close();
assertEquals(-1, ping.roundTripTime());
}
private void writeAndClose(SpdyStream stream, String data) throws IOException {