1
0
mirror of https://github.com/square/okhttp.git synced 2026-01-27 04:22:07 +03:00

Merge pull request #72 from square/jwilson/goaway

Implement GOAWAY, for clean shutdowns of SPDY connections.
This commit is contained in:
Jake Wharton
2012-12-30 23:43:35 -08:00
9 changed files with 205 additions and 32 deletions

View File

@@ -113,7 +113,7 @@ public final class ConnectionPool {
*/
public void recycle(Connection connection) {
if (connection.isSpdy()) {
throw new IllegalArgumentException(); // TODO: just 'return' here?
return;
}
try {

View File

@@ -503,7 +503,6 @@ public class HttpEngine {
requestHeaders.setHost(getOriginAddress(policy.getURL()));
}
// TODO: this shouldn't be set for SPDY (it's ignored)
if ((connection == null || connection.getHttpMinorVersion() != 0)
&& requestHeaders.getConnection() == null) {
requestHeaders.setConnection("Keep-Alive");
@@ -511,7 +510,6 @@ public class HttpEngine {
if (requestHeaders.getAcceptEncoding() == null) {
transparentGzip = true;
// TODO: this shouldn't be set for SPDY (it isn't necessary)
requestHeaders.setAcceptEncoding("gzip");
}

View File

@@ -393,6 +393,11 @@ public final class RawHeaders {
throw new IllegalArgumentException("Unexpected header: " + name + ": " + value);
}
// Drop headers that are ignored when layering HTTP over SPDY.
if (name.equals("connection") || name.equals("accept-encoding")) {
continue;
}
// If we haven't seen this name before, add the pair to the end of the list...
if (names.add(name)) {
result.add(name);

View File

@@ -25,6 +25,7 @@ import java.io.OutputStream;
import java.net.ProtocolException;
import java.net.Socket;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@@ -90,7 +91,9 @@ public final class SpdyConnection implements Closeable {
private final ExecutorService callbackExecutor;
private final Map<Integer, SpdyStream> streams = new HashMap<Integer, SpdyStream>();
private int lastGoodStreamId;
private int nextStreamId;
private boolean shutdown;
/** Lazily-created map of in-flight pings awaiting a response. Guarded by this. */
private Map<Integer, Ping> pings;
@@ -144,6 +147,9 @@ public final class SpdyConnection implements Closeable {
synchronized (spdyWriter) {
synchronized (this) {
if (shutdown) {
throw new IOException("shutdown");
}
streamId = nextStreamId;
nextStreamId += 2;
stream = new SpdyStream(streamId, this, requestHeaders, flags);
@@ -235,6 +241,23 @@ public final class SpdyConnection implements Closeable {
}
}
/**
* Degrades this connection such that new streams can neither be created
* locally, nor accepted from the remote peer. Existing streams are not
* impacted. This is intended to permit an endpoint to gracefully stop
* accepting new requests without harming previously established streams.
*/
public void shutdown() throws IOException {
synchronized (spdyWriter) {
int lastGoodStreamId;
synchronized (this) {
shutdown = true;
lastGoodStreamId = this.lastGoodStreamId;
}
spdyWriter.goAway(0, lastGoodStreamId);
}
}
@Override public void close() throws IOException {
close(null);
}
@@ -320,6 +343,10 @@ public final class SpdyConnection implements Closeable {
nameValueBlock, flags);
final SpdyStream previous;
synchronized (SpdyConnection.this) {
if (shutdown) {
return;
}
lastGoodStreamId = streamId;
previous = streams.put(streamId, synStream);
}
if (previous != null) {
@@ -385,5 +412,22 @@ public final class SpdyConnection implements Closeable {
}
}
}
@Override public void goAway(int flags, int lastGoodStreamId) {
synchronized (SpdyConnection.this) {
shutdown = true;
// Fail all streams created after the last good stream ID.
for (Iterator<Map.Entry<Integer, SpdyStream>> i = streams.entrySet().iterator();
i.hasNext();) {
Map.Entry<Integer, SpdyStream> entry = i.next();
int streamId = entry.getKey();
if (streamId > lastGoodStreamId && entry.getValue().isLocallyInitiated()) {
entry.getValue().receiveRstStream(SpdyStream.RST_REFUSED_STREAM);
i.remove();
}
}
}
}
}
}

View File

@@ -113,6 +113,9 @@ final class SpdyReader {
return true;
case SpdyConnection.TYPE_GOAWAY:
readGoAway(handler, flags, length);
return true;
case SpdyConnection.TYPE_HEADERS:
Streams.skipByReading(in, length);
throw new UnsupportedOperationException("TODO");
@@ -227,6 +230,12 @@ final class SpdyReader {
handler.ping(flags, id);
}
private void readGoAway(Handler handler, int flags, int length) throws IOException {
if (length != 4) throw ioException("TYPE_GOAWAY length: %d != 4", length);
int lastGoodStreamId = in.readInt() & 0x7fffffff;
handler.goAway(flags, lastGoodStreamId);
}
private void readSettings(Handler handler, int flags, int length) throws IOException {
int numberOfEntries = in.readInt();
if (length != 4 + 8 * numberOfEntries) {
@@ -259,7 +268,7 @@ final class SpdyReader {
void settings(int flags, Settings settings);
void noop();
void ping(int flags, int streamId);
// TODO: goaway
void goAway(int flags, int lastGoodStreamId);
// TODO: headers
}
}

View File

@@ -39,6 +39,17 @@ public final class SpdyStream {
private static final int DATA_FRAME_HEADER_LENGTH = 8;
private static final String[] STATUS_CODE_NAMES = {
null,
"PROTOCOL_ERROR",
"INVALID_STREAM",
"REFUSED_STREAM",
"UNSUPPORTED_VERSION",
"CANCEL",
"INTERNAL_ERROR",
"FLOW_CONTROL_ERROR",
};
public static final int RST_PROTOCOL_ERROR = 1;
public static final int RST_INVALID_STREAM = 2;
public static final int RST_REFUSED_STREAM = 3;
@@ -110,7 +121,7 @@ public final class SpdyStream {
if (responseHeaders != null) {
return responseHeaders;
}
throw new IOException("stream was reset: " + rstStatusCode);
throw new IOException("stream was reset: " + rstStatusString());
} catch (InterruptedException e) {
InterruptedIOException rethrow = new InterruptedIOException();
rethrow.initCause(e);
@@ -203,13 +214,13 @@ public final class SpdyStream {
private boolean closeInternal(int rstStatusCode) {
assert (!Thread.holdsLock(this));
synchronized (this) {
// TODO: no-op if inFinished == true and outFinished == true ?
if (this.rstStatusCode != -1) {
return false;
}
if (in.finished && out.finished) {
return false;
}
this.rstStatusCode = rstStatusCode;
in.finished = true;
out.finished = true;
notifyAll();
}
connection.removeStream(id);
@@ -241,12 +252,16 @@ public final class SpdyStream {
synchronized void receiveRstStream(int statusCode) {
if (rstStatusCode == -1) {
rstStatusCode = statusCode;
in.finished = true;
out.finished = true;
notifyAll();
}
}
private String rstStatusString() {
return rstStatusCode > 0 && rstStatusCode < STATUS_CODE_NAMES.length
? STATUS_CODE_NAMES[rstStatusCode]
: Integer.toString(rstStatusCode);
}
/**
* An input stream that reads the incoming data frames of a stream. Although
* this class uses synchronization to safely receive incoming data frames,
@@ -279,8 +294,8 @@ public final class SpdyStream {
private boolean closed;
/**
* True if either side has shut down this stream. We will receive no
* more bytes beyond those already in the buffer.
* True if either side has cleanly shut down this stream. We will
* receive no more bytes beyond those already in the buffer.
*/
private boolean finished;
@@ -303,16 +318,15 @@ public final class SpdyStream {
@Override public int read(byte[] b, int offset, int count) throws IOException {
synchronized (SpdyStream.this) {
checkNotClosed();
checkOffsetAndCount(b.length, offset, count);
while (pos == -1 && !finished) {
while (pos == -1 && !finished && !closed && rstStatusCode == -1) {
try {
SpdyStream.this.wait();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}
checkNotClosed();
if (pos == -1) {
return -1;
@@ -407,6 +421,7 @@ public final class SpdyStream {
@Override public void close() throws IOException {
synchronized (SpdyStream.this) {
closed = true;
SpdyStream.this.notifyAll();
}
cancelStreamIfNecessary();
}
@@ -415,22 +430,22 @@ public final class SpdyStream {
if (closed) {
throw new IOException("stream closed");
}
if (rstStatusCode != -1) {
throw new IOException("stream was reset: " + rstStatusString());
}
}
}
private void cancelStreamIfNecessary() throws IOException {
assert (!Thread.holdsLock(SpdyStream.this));
synchronized (this) {
if (in.closed && !in.finished && (out.finished || out.closed)) {
// RST this stream to prevent additional data from being sent. This is safe because
// the input stream is closed (we won't use any further bytes) and the output stream
// is either finished or closed (so RSTing both streams won't cause harm).
in.finished = true;
} else {
// We shouldn't cancel this stream.
return;
if (!in.closed || in.finished || (!out.finished && !out.closed)) {
return; // We shouldn't cancel this stream (or don't need to).
}
}
// RST this stream to prevent additional data from being sent. This is safe because
// the input stream is closed (we won't use any further bytes) and the output stream
// is either finished or closed (so RSTing both streams doesn't cause harm).
SpdyStream.this.close(RST_CANCEL);
}
@@ -446,8 +461,8 @@ public final class SpdyStream {
private boolean closed;
/**
* True if either side has shut down this stream. We shall send no more
* bytes.
* True if either side has cleanly shut down this stream. We shall send
* no more bytes.
*/
private boolean finished;
@@ -511,10 +526,10 @@ public final class SpdyStream {
synchronized (SpdyStream.this) {
if (closed) {
throw new IOException("stream closed");
}
if (finished) {
throw new IOException("output stream finished "
+ "(RST status code=" + rstStatusCode + ")");
} else if (finished) {
throw new IOException("stream finished");
} else if (rstStatusCode != -1) {
throw new IOException("stream was reset: " + rstStatusString());
}
}
}

View File

@@ -140,4 +140,13 @@ final class SpdyWriter {
out.writeInt(id);
out.flush();
}
public synchronized void goAway(int flags, int lastGoodStreamId) throws IOException {
int type = SpdyConnection.TYPE_GOAWAY;
int length = 4;
out.writeInt(0x80000000 | (SpdyConnection.VERSION & 0x7fff) << 16 | type & 0xffff);
out.writeInt((flags & 0xff) << 24 | length & 0xffffff);
out.writeInt(lastGoodStreamId);
out.flush();
}
}

View File

@@ -81,6 +81,7 @@ public final class MockSpdyPeer {
Socket socket = serverSocket.accept();
OutputStream out = socket.getOutputStream();
InputStream in = socket.getInputStream();
SpdyReader reader = new SpdyReader(in);
Iterator<OutFrame> outFramesIterator = outFrames.iterator();
byte[] outBytes = bytesOut.toByteArray();
@@ -106,7 +107,6 @@ public final class MockSpdyPeer {
} else {
// read a frame
SpdyReader reader = new SpdyReader(in);
InFrame inFrame = new InFrame(i, reader);
reader.nextFrame(inFrame);
inFrames.add(inFrame);
@@ -201,5 +201,12 @@ public final class MockSpdyPeer {
if (this.type != -1) throw new IllegalStateException();
this.type = SpdyConnection.TYPE_NOOP;
}
@Override public void goAway(int flags, int lastGoodStreamId) {
if (this.type != -1) throw new IllegalStateException();
this.type = SpdyConnection.TYPE_GOAWAY;
this.flags = flags;
this.streamId = lastGoodStreamId;
}
}
}

View File

@@ -20,6 +20,7 @@ import static com.squareup.okhttp.internal.Util.UTF_8;
import static com.squareup.okhttp.internal.net.spdy.Settings.PERSIST_VALUE;
import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.FLAG_FIN;
import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.TYPE_DATA;
import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.TYPE_GOAWAY;
import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.TYPE_NOOP;
import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.TYPE_PING;
import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.TYPE_RST_STREAM;
@@ -331,6 +332,7 @@ public final class SpdyConnectionTest {
out.write("round".getBytes(UTF_8));
fail();
} catch (Exception expected) {
assertEquals("stream closed", expected.getMessage());
}
// verify the peer received what was expected
@@ -368,6 +370,7 @@ public final class SpdyConnectionTest {
out.write("square".getBytes(UTF_8));
fail();
} catch (IOException expected) {
assertEquals("stream was reset: CANCEL", expected.getMessage());
}
out.close();
@@ -402,11 +405,13 @@ public final class SpdyConnectionTest {
in.read();
fail();
} catch (IOException expected) {
assertEquals("stream closed", expected.getMessage());
}
try {
out.write('a');
fail();
} catch (IOException expected) {
assertEquals("stream finished", expected.getMessage());
}
// verify the peer received what was expected
@@ -443,6 +448,7 @@ public final class SpdyConnectionTest {
in.read();
fail();
} catch (IOException expected) {
assertEquals("stream closed", expected.getMessage());
}
out.write("square".getBytes(UTF_8));
out.flush();
@@ -490,19 +496,29 @@ public final class SpdyConnectionTest {
// write the mocking script
peer.acceptFrame();
peer.sendFrame().synReply(0, 1, Arrays.asList("a", "android"));
peer.acceptFrame(); // PING
peer.sendFrame().synReply(0, 1, Arrays.asList("b", "banana"));
peer.acceptFrame();
peer.sendFrame().ping(0, 1);
peer.acceptFrame(); // RST STREAM
peer.play();
// play it back
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
SpdyStream stream = connection.newStream(Arrays.asList("c", "cola"), true, true);
assertEquals(Arrays.asList("a", "android"), stream.getResponseHeaders());
assertStreamData("", stream.getInputStream());
connection.ping().roundTripTime(); // Ensure that the 2nd SYN REPLY has been received.
try {
stream.getInputStream().read();
fail();
} catch (IOException e) {
assertEquals("stream was reset: PROTOCOL_ERROR", e.getMessage());
}
// verify the peer received what was expected
MockSpdyPeer.InFrame synStream = peer.takeFrame();
assertEquals(TYPE_SYN_STREAM, synStream.type);
MockSpdyPeer.InFrame ping = peer.takeFrame();
assertEquals(TYPE_PING, ping.type);
MockSpdyPeer.InFrame rstStream = peer.takeFrame();
assertEquals(TYPE_RST_STREAM, rstStream.type);
assertEquals(1, rstStream.streamId);
@@ -611,6 +627,7 @@ public final class SpdyConnectionTest {
stream.getResponseHeaders();
fail();
} catch (IOException expected) {
assertEquals("stream was reset: REFUSED_STREAM", expected.getMessage());
}
// verify the peer received what was expected
@@ -622,6 +639,75 @@ public final class SpdyConnectionTest {
assertEquals(0, ping.flags);
}
@Test public void receiveGoAway() throws Exception {
// write the mocking script
peer.acceptFrame(); // SYN STREAM 1
peer.acceptFrame(); // SYN STREAM 3
peer.sendFrame().goAway(0, 1);
peer.acceptFrame(); // PING
peer.sendFrame().ping(0, 1);
peer.acceptFrame(); // DATA STREAM 1
peer.play();
// play it back
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
SpdyStream stream1 = connection.newStream(Arrays.asList("a", "android"), true, true);
SpdyStream stream2 = connection.newStream(Arrays.asList("b", "banana"), true, true);
connection.ping().roundTripTime(); // Ensure that the GO_AWAY has been received.
stream1.getOutputStream().write("abc".getBytes(UTF_8));
try {
stream2.getOutputStream().write("abc".getBytes(UTF_8));
fail();
} catch (IOException expected) {
assertEquals("stream was reset: REFUSED_STREAM", expected.getMessage());
}
stream1.getOutputStream().write("def".getBytes(UTF_8));
stream1.getOutputStream().close();
try {
connection.newStream(Arrays.asList("c", "cola"), true, true);
fail();
} catch (IOException expected) {
assertEquals("shutdown", expected.getMessage());
}
// verify the peer received what was expected
MockSpdyPeer.InFrame synStream1 = peer.takeFrame();
assertEquals(TYPE_SYN_STREAM, synStream1.type);
MockSpdyPeer.InFrame synStream2 = peer.takeFrame();
assertEquals(TYPE_SYN_STREAM, synStream2.type);
MockSpdyPeer.InFrame ping = peer.takeFrame();
assertEquals(TYPE_PING, ping.type);
MockSpdyPeer.InFrame data1 = peer.takeFrame();
assertEquals(TYPE_DATA, data1.type);
assertEquals(1, data1.streamId);
assertTrue(Arrays.equals("abcdef".getBytes("UTF-8"), data1.data));
}
@Test public void sendGoAway() throws Exception {
// write the mocking script
peer.acceptFrame(); // SYN STREAM 1
peer.acceptFrame(); // GOAWAY
peer.sendFrame().synStream(0, 2, 0, 0, Arrays.asList("b", "banana")); // Should be ignored!
peer.acceptFrame(); // PING
peer.sendFrame().ping(0, 1);
peer.play();
// play it back
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
connection.newStream(Arrays.asList("a", "android"), true, true);
connection.shutdown();
connection.ping().roundTripTime(); // Ensure that the SYN STREAM has been received.
// verify the peer received what was expected
MockSpdyPeer.InFrame synStream1 = peer.takeFrame();
assertEquals(TYPE_SYN_STREAM, synStream1.type);
MockSpdyPeer.InFrame goaway = peer.takeFrame();
assertEquals(TYPE_GOAWAY, goaway.type);
assertEquals(0, goaway.streamId);
MockSpdyPeer.InFrame ping = peer.takeFrame();
assertEquals(TYPE_PING, ping.type);
}
private void writeAndClose(SpdyStream stream, String data) throws IOException {
OutputStream out = stream.getOutputStream();
out.write(data.getBytes("UTF-8"));