From 6e382aa9b807c61adc88eb0a2cddc047382e36d9 Mon Sep 17 00:00:00 2001 From: jwilson Date: Sat, 17 Aug 2013 20:01:33 -0400 Subject: [PATCH] Read HTTP/2.0 frames and error codes. This introduces a new ErrorCode class that identifies codes for HTTP/2.0, SPDY/3 RST codes, and SPDY/3 GO_AWAY codes. --- .../com/squareup/okhttp/internal/Util.java | 2 + .../okhttp/internal/spdy/ErrorCode.java | 67 ++++++++ .../okhttp/internal/spdy/FrameReader.java | 9 +- .../okhttp/internal/spdy/FrameWriter.java | 6 +- .../okhttp/internal/spdy/Http20Draft04.java | 156 ++++++++++++++++-- .../internal/spdy/IncomingStreamHandler.java | 2 +- .../okhttp/internal/spdy/Settings.java | 31 +++- .../squareup/okhttp/internal/spdy/Spdy3.java | 57 ++++--- .../okhttp/internal/spdy/SpdyConnection.java | 94 ++++++----- .../okhttp/internal/spdy/SpdyStream.java | 96 ++++------- .../okhttp/internal/spdy/Variant.java | 4 +- .../okhttp/internal/spdy/MockSpdyPeer.java | 25 +-- .../internal/spdy/SpdyConnectionTest.java | 102 ++++++------ .../okhttp/internal/http/SpdyTransport.java | 3 +- 14 files changed, 434 insertions(+), 220 deletions(-) create mode 100644 okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/ErrorCode.java diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/Util.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/Util.java index 0ce7f8aba..d1b285708 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/Util.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/Util.java @@ -262,6 +262,8 @@ public final class Util { * buffer. */ public static long skipByReading(InputStream in, long byteCount) throws IOException { + if (byteCount == 0) return 0L; + // acquire the shared skip buffer. byte[] buffer = skipBuffer.getAndSet(null); if (buffer == null) { diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/ErrorCode.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/ErrorCode.java new file mode 100644 index 000000000..d3a32e117 --- /dev/null +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/ErrorCode.java @@ -0,0 +1,67 @@ +package com.squareup.okhttp.internal.spdy; + +public enum ErrorCode { + /** Not an error! For SPDY stream resets, prefer null over NO_ERROR. */ + NO_ERROR(0, -1, 0), + + PROTOCOL_ERROR(1, 1, 1), + + /** A subtype of PROTOCOL_ERROR used by SPDY. */ + INVALID_STREAM(1, 2, -1), + + /** A subtype of PROTOCOL_ERROR used by SPDY. */ + UNSUPPORTED_VERSION(1, 4, -1), + + /** A subtype of PROTOCOL_ERROR used by SPDY. */ + STREAM_IN_USE(1, 8, -1), + + /** A subtype of PROTOCOL_ERROR used by SPDY. */ + STREAM_ALREADY_CLOSED(1, 9, -1), + + INTERNAL_ERROR(2, 6, 2), + + FLOW_CONTROL_ERROR(3, 7, -1), + + STREAM_CLOSED(5, -1, -1), + + FRAME_TOO_LARGE(6, 11, -1), + + REFUSED_STREAM(7, 3, -1), + + CANCEL(8, 5, -1), + + COMPRESSION_ERROR(9, -1, -1), + + INVALID_CREDENTIALS(-1, 10, -1); + + public final int httpCode; + public final int spdyRstCode; + public final int spdyGoAwayCode; + + private ErrorCode(int httpCode, int spdyRstCode, int spdyGoAwayCode) { + this.httpCode = httpCode; + this.spdyRstCode = spdyRstCode; + this.spdyGoAwayCode = spdyGoAwayCode; + } + + public static ErrorCode fromSpdy3Rst(int code) { + for (ErrorCode errorCode : ErrorCode.values()) { + if (errorCode.spdyRstCode == code) return errorCode; + } + return null; + } + + public static ErrorCode fromHttp2(int code) { + for (ErrorCode errorCode : ErrorCode.values()) { + if (errorCode.httpCode == code) return errorCode; + } + return null; + } + + public static ErrorCode fromSpdyGoAway(int code) { + for (ErrorCode errorCode : ErrorCode.values()) { + if (errorCode.spdyGoAwayCode == code) return errorCode; + } + return null; + } +} diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/FrameReader.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/FrameReader.java index cfd1f49a0..f47561ee2 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/FrameReader.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/FrameReader.java @@ -31,11 +31,12 @@ public interface FrameReader extends Closeable { int priority, int slot, List nameValueBlock); void synReply(boolean inFinished, int streamId, List nameValueBlock) throws IOException; void headers(int streamId, List nameValueBlock) throws IOException; - void rstStream(int streamId, int statusCode); + void rstStream(int streamId, ErrorCode errorCode); void settings(boolean clearPrevious, Settings settings); void noop(); - void ping(int streamId); - void goAway(int lastGoodStreamId, int statusCode); - void windowUpdate(int streamId, int deltaWindowSize); + void ping(boolean reply, int payload1, int payload2); + void goAway(int lastGoodStreamId, ErrorCode errorCode); + void windowUpdate(int streamId, int deltaWindowSize, boolean endFlowControl); + void priority(int streamId, int priority); } } diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/FrameWriter.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/FrameWriter.java index 3bf7147b7..0c694e361 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/FrameWriter.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/FrameWriter.java @@ -31,13 +31,13 @@ public interface FrameWriter extends Closeable { int priority, int slot, List nameValueBlock) throws IOException; void synReply(boolean outFinished, int streamId, List nameValueBlock) throws IOException; void headers(int streamId, List nameValueBlock) throws IOException; - void rstStream(int streamId, int statusCode) throws IOException; + void rstStream(int streamId, ErrorCode errorCode) throws IOException; void data(boolean outFinished, int streamId, byte[] data) throws IOException; void data(boolean outFinished, int streamId, byte[] data, int offset, int byteCount) throws IOException; void settings(Settings settings) throws IOException; void noop() throws IOException; - void ping(int id) throws IOException; - void goAway(int lastGoodStreamId, int statusCode) throws IOException; + void ping(boolean reply, int payload1, int payload2) throws IOException; + void goAway(int lastGoodStreamId, ErrorCode errorCode) throws IOException; void windowUpdate(int streamId, int deltaWindowSize) throws IOException; } diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Http20Draft04.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Http20Draft04.java index d0f059429..7dd229505 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Http20Draft04.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Http20Draft04.java @@ -15,6 +15,7 @@ */ package com.squareup.okhttp.internal.spdy; +import com.squareup.okhttp.internal.Util; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -23,11 +24,27 @@ import java.io.OutputStream; import java.util.List; final class Http20Draft04 implements Variant { - @Override public FrameReader newReader(InputStream in) { + static final int TYPE_DATA = 0x0; + static final int TYPE_HEADERS = 0x1; + static final int TYPE_PRIORITY = 0x2; + static final int TYPE_RST_STREAM = 0x3; + static final int TYPE_SETTINGS = 0x4; + static final int TYPE_PUSH_PROMISE = 0x5; + static final int TYPE_PING = 0x6; + static final int TYPE_GOAWAY = 0x7; + static final int TYPE_WINDOW_UPDATE = 0x9; + + static final int FLAG_END_STREAM = 0x1; + static final int FLAG_END_HEADERS = 0x4; + static final int FLAG_PRIORITY = 0x8; + static final int FLAG_PONG = 0x1; + static final int FLAG_END_FLOW_CONTROL = 0x1; + + @Override public FrameReader newReader(InputStream in, boolean client) { return new Reader(in); } - @Override public FrameWriter newWriter(OutputStream out) { + @Override public FrameWriter newWriter(OutputStream out, boolean client) { return new Writer(out); } @@ -50,12 +67,130 @@ final class Http20Draft04 implements Variant { int length = w1 & 0xffff; int type = (w1 & 0xff0000) >> 16; int flags = (w1 & 0xff000000) >> 24; - boolean r = (w2 & 0x80000000) != 0; + // boolean r = (w2 & 0x80000000) != 0; // Reserved. int streamId = (w2 & 0x7fffffff); + switch (type) { + case TYPE_DATA: + readData(handler, flags, length, streamId); + return true; + + case TYPE_PRIORITY: + readPriority(handler, flags, length, streamId); + return true; + + case TYPE_RST_STREAM: + readRstStream(handler, flags, length, streamId); + return true; + + case TYPE_SETTINGS: + readSettings(handler, flags, length, streamId); + return true; + + case TYPE_PUSH_PROMISE: + readPushPromise(handler, flags, length, streamId); + return true; + + case TYPE_PING: + readPing(handler, flags, length, streamId); + return true; + + case TYPE_GOAWAY: + readGoAway(handler, flags, length, streamId); + return true; + + case TYPE_WINDOW_UPDATE: + readWindowUpdate(handler, flags, length, streamId); + return true; + } + throw new UnsupportedOperationException("TODO"); } + private void readData(Handler handler, int flags, int length, int streamId) throws IOException { + boolean inFinished = (flags & FLAG_END_STREAM) != 0; + handler.data(inFinished, streamId, in, length); + } + + private void readPriority(Handler handler, int flags, int length, int streamId) + throws IOException { + if (length != 4) throw ioException("TYPE_PRIORITY length: %d != 4", length); + if (streamId == 0) throw ioException("TYPE_PRIORITY streamId == 0"); + int w1 = in.readInt(); + // boolean r = (w1 & 0x80000000) != 0; // Reserved. + int priority = (w1 & 0x7fffffff); + handler.priority(streamId, priority); + } + + private void readRstStream(Handler handler, int flags, int length, int streamId) + throws IOException { + if (length != 4) throw ioException("TYPE_RST_STREAM length: %d != 4", length); + if (streamId == 0) throw ioException("TYPE_RST_STREAM streamId == 0"); + int errorCodeInt = in.readInt(); + ErrorCode errorCode = ErrorCode.fromHttp2(errorCodeInt); + if (errorCode == null) { + throw ioException("TYPE_RST_STREAM unexpected error code: %d", errorCodeInt); + } + handler.rstStream(streamId, errorCode); + } + + private void readSettings(Handler handler, int flags, int length, int streamId) + throws IOException { + if (length % 8 != 0) throw ioException("TYPE_SETTINGS length %% 8 != 0: %s", length); + if (streamId != 0) throw ioException("TYPE_SETTINGS streamId != 0"); + Settings settings = new Settings(); + for (int i = 0; i < length; i += 8) { + int w1 = in.readInt(); + int value = in.readInt(); + // int r = (w1 & 0xff000000) >>> 24; // Reserved. + int id = w1 & 0xffffff; + settings.set(id, 0, value); + } + handler.settings(false, settings); + } + + private void readPushPromise(Handler handler, int flags, int length, int streamId) { + // TODO: + } + + private void readPing(Handler handler, int flags, int length, int streamId) throws IOException { + if (length != 8) throw ioException("TYPE_PING length != 8: %s", length); + if (streamId != 0) throw ioException("TYPE_PING streamId != 0"); + int payload1 = in.readInt(); + int payload2 = in.readInt(); + boolean reply = (flags & FLAG_PONG) != 0; + handler.ping(reply, payload1, payload2); + } + + private void readGoAway(Handler handler, int flags, int length, int streamId) + throws IOException { + if (length < 8) throw ioException("TYPE_GOAWAY length < 8: %s", length); + int lastStreamId = in.readInt(); + int errorCodeInt = in.readInt(); + int opaqueDataLength = length - 8; + ErrorCode errorCode = ErrorCode.fromHttp2(errorCodeInt); + if (errorCode == null) { + throw ioException("TYPE_RST_STREAM unexpected error code: %d", errorCodeInt); + } + if (Util.skipByReading(in, opaqueDataLength) != opaqueDataLength) { + throw new IOException("TYPE_GOAWAY opaque data was truncated"); + } + handler.goAway(lastStreamId, errorCode); + } + + private void readWindowUpdate(Handler handler, int flags, int length, int streamId) + throws IOException { + int w1 = in.readInt(); + // boolean r = (w1 & 0x80000000) != 0; // Reserved. + int windowSizeIncrement = (w1 & 0x7fffffff); + boolean endFlowControl = (flags & FLAG_END_FLOW_CONTROL) != 0; + handler.windowUpdate(streamId, windowSizeIncrement, endFlowControl); + } + + private static IOException ioException(String message, Object... args) throws IOException { + throw new IOException(String.format(message, args)); + } + @Override public void close() throws IOException { in.close(); } @@ -92,17 +227,17 @@ final class Http20Draft04 implements Variant { throw new UnsupportedOperationException("TODO"); } - @Override public synchronized void rstStream(int streamId, int statusCode) throws IOException { + @Override public synchronized void rstStream(int streamId, ErrorCode errorCode) + throws IOException { throw new UnsupportedOperationException("TODO"); } - @Override public synchronized void data(boolean outFinished, int streamId, byte[] data) - throws IOException { + @Override public void data(boolean outFinished, int streamId, byte[] data) throws IOException { data(outFinished, streamId, data, 0, data.length); } - @Override public void data(boolean outFinished, int streamId, byte[] data, int offset, - int byteCount) throws IOException { + @Override public synchronized void data(boolean outFinished, int streamId, byte[] data, + int offset, int byteCount) throws IOException { throw new UnsupportedOperationException("TODO"); } @@ -114,11 +249,12 @@ final class Http20Draft04 implements Variant { throw new UnsupportedOperationException("TODO"); } - @Override public synchronized void ping(int id) throws IOException { + @Override public synchronized void ping(boolean reply, int payload1, int payload2) + throws IOException { throw new UnsupportedOperationException("TODO"); } - @Override public synchronized void goAway(int lastGoodStreamId, int statusCode) + @Override public synchronized void goAway(int lastGoodStreamId, ErrorCode errorCode) throws IOException { throw new UnsupportedOperationException("TODO"); } diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/IncomingStreamHandler.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/IncomingStreamHandler.java index 875fff0fd..44d4ea2bf 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/IncomingStreamHandler.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/IncomingStreamHandler.java @@ -22,7 +22,7 @@ import java.io.IOException; public interface IncomingStreamHandler { IncomingStreamHandler REFUSE_INCOMING_STREAMS = new IncomingStreamHandler() { @Override public void receive(SpdyStream stream) throws IOException { - stream.close(SpdyStream.RST_REFUSED_STREAM); + stream.close(ErrorCode.REFUSED_STREAM); } }; diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Settings.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Settings.java index 774d79121..05380e27e 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Settings.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Settings.java @@ -31,23 +31,29 @@ final class Settings { static final int PERSISTED = 0x2; /** Sender's estimate of max incoming kbps. */ - static final int UPLOAD_BANDWIDTH = 0x1; + static final int UPLOAD_BANDWIDTH = 1; /** Sender's estimate of max outgoing kbps. */ - static final int DOWNLOAD_BANDWIDTH = 0x2; + static final int DOWNLOAD_BANDWIDTH = 2; /** Sender's estimate of milliseconds between sending a request and receiving a response. */ - static final int ROUND_TRIP_TIME = 0x3; + static final int ROUND_TRIP_TIME = 3; /** Sender's maximum number of concurrent streams. */ - static final int MAX_CONCURRENT_STREAMS = 0x4; + static final int MAX_CONCURRENT_STREAMS = 4; /** Current CWND in Packets. */ - static final int CURRENT_CWND = 0x5; + static final int CURRENT_CWND = 5; /** Retransmission rate. Percentage */ - static final int DOWNLOAD_RETRANS_RATE = 0x6; + static final int DOWNLOAD_RETRANS_RATE = 6; /** Window size in bytes. */ - static final int INITIAL_WINDOW_SIZE = 0x7; + static final int INITIAL_WINDOW_SIZE = 7; /** Window size in bytes. */ - static final int CLIENT_CERTIFICATE_VECTOR_SIZE = 0x8; + static final int CLIENT_CERTIFICATE_VECTOR_SIZE = 8; + /** Flow control options. */ + static final int FLOW_CONTROL_OPTIONS = 9; + /** Total number of settings. */ - static final int COUNT = 0x9; + static final int COUNT = 10; + + /** If set, flow control is disabled for streams directed to the sender of these settings. */ + static final int FLOW_CONTROL_OPTIONS_DISABLED = 0x1; /** Bitfield of which flags that values. */ private int set; @@ -146,6 +152,13 @@ final class Settings { return (bit & set) != 0 ? values[CLIENT_CERTIFICATE_VECTOR_SIZE] : defaultValue; } + // TODO: honor this setting. + boolean isFlowControlDisabled() { + int bit = 1 << FLOW_CONTROL_OPTIONS; + int value = (bit & set) != 0 ? values[FLOW_CONTROL_OPTIONS] : 0; + return (value & FLOW_CONTROL_OPTIONS_DISABLED) != 0; + } + /** * Returns true if this user agent should use this setting in future SPDY * connections to the same host. diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Spdy3.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Spdy3.java index 0fac9d5d8..4001f3e99 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Spdy3.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Spdy3.java @@ -92,23 +92,25 @@ final class Spdy3 implements Variant { } } - @Override public FrameReader newReader(InputStream in) { - return new Reader(in); + @Override public FrameReader newReader(InputStream in, boolean client) { + return new Reader(in, client); } - @Override public FrameWriter newWriter(OutputStream out) { - return new Writer(out); + @Override public FrameWriter newWriter(OutputStream out, boolean client) { + return new Writer(out, client); } /** Read spdy/3 frames. */ static final class Reader implements FrameReader { private final DataInputStream in; private final DataInputStream nameValueBlockIn; + private final boolean client; private int compressedLimit; - Reader(InputStream in) { + Reader(InputStream in, boolean client) { this.in = new DataInputStream(in); this.nameValueBlockIn = newNameValueBlockStream(); + this.client = client; } /** @@ -216,8 +218,12 @@ final class Spdy3 implements Variant { private void readRstStream(Handler handler, int flags, int length) throws IOException { if (length != 8) throw ioException("TYPE_RST_STREAM length: %d != 8", length); int streamId = in.readInt() & 0x7fffffff; - int statusCode = in.readInt(); - handler.rstStream(streamId, statusCode); + int errorCodeInt = in.readInt(); + ErrorCode errorCode = ErrorCode.fromSpdy3Rst(errorCodeInt); + if (errorCode == null) { + throw ioException("TYPE_RST_STREAM unexpected error code: %d", errorCodeInt); + } + handler.rstStream(streamId, errorCode); } private void readHeaders(Handler handler, int flags, int length) throws IOException { @@ -233,7 +239,7 @@ final class Spdy3 implements Variant { int w2 = in.readInt(); int streamId = w1 & 0x7fffffff; int deltaWindowSize = w2 & 0x7fffffff; - handler.windowUpdate(streamId, deltaWindowSize); + handler.windowUpdate(streamId, deltaWindowSize, false); } private DataInputStream newNameValueBlockStream() { @@ -308,14 +314,19 @@ final class Spdy3 implements Variant { private void readPing(Handler handler, int flags, int length) throws IOException { if (length != 4) throw ioException("TYPE_PING length: %d != 4", length); int id = in.readInt(); - handler.ping(id); + boolean reply = client == ((id % 2) == 1); + handler.ping(reply, id, 0); } private void readGoAway(Handler handler, int flags, int length) throws IOException { if (length != 8) throw ioException("TYPE_GOAWAY length: %d != 8", length); int lastGoodStreamId = in.readInt() & 0x7fffffff; - int statusCode = in.readInt(); - handler.goAway(lastGoodStreamId, statusCode); + int errorCodeInt = in.readInt(); + ErrorCode errorCode = ErrorCode.fromSpdyGoAway(errorCodeInt); + if (errorCode == null) { + throw ioException("TYPE_GOAWAY unexpected error code: %d", errorCodeInt); + } + handler.goAway(lastGoodStreamId, errorCode); } private void readSettings(Handler handler, int flags, int length) throws IOException { @@ -349,9 +360,11 @@ final class Spdy3 implements Variant { private final DataOutputStream out; private final ByteArrayOutputStream nameValueBlockBuffer; private final DataOutputStream nameValueBlockOut; + private final boolean client; - Writer(OutputStream out) { + Writer(OutputStream out, boolean client) { this.out = new DataOutputStream(out); + this.client = client; Deflater deflater = new Deflater(); deflater.setDictionary(DICTIONARY); @@ -360,7 +373,7 @@ final class Spdy3 implements Variant { Platform.get().newDeflaterOutputStream(nameValueBlockBuffer, deflater, true)); } - @Override public void connectionHeader() { + @Override public synchronized void connectionHeader() { // Do nothing: no connection header for SPDY/3. } @@ -414,14 +427,16 @@ final class Spdy3 implements Variant { out.flush(); } - @Override public synchronized void rstStream(int streamId, int statusCode) throws IOException { + @Override public synchronized void rstStream(int streamId, ErrorCode errorCode) + throws IOException { + if (errorCode.spdyRstCode == -1) throw new IllegalArgumentException(); int flags = 0; int type = TYPE_RST_STREAM; int length = 8; out.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); out.writeInt((flags & 0xff) << 24 | length & 0xffffff); out.writeInt(streamId & 0x7fffffff); - out.writeInt(statusCode); + out.writeInt(errorCode.spdyRstCode); out.flush(); } @@ -475,25 +490,29 @@ final class Spdy3 implements Variant { out.flush(); } - @Override public synchronized void ping(int id) throws IOException { + @Override public synchronized void ping(boolean reply, int payload1, int payload2) + throws IOException { + boolean payloadIsReply = client != ((payload1 % 2) == 1); + if (reply != payloadIsReply) throw new IllegalArgumentException("payload != reply"); int type = TYPE_PING; int flags = 0; int length = 4; out.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); out.writeInt((flags & 0xff) << 24 | length & 0xffffff); - out.writeInt(id); + out.writeInt(payload1); out.flush(); } - @Override public synchronized void goAway(int lastGoodStreamId, int statusCode) + @Override public synchronized void goAway(int lastGoodStreamId, ErrorCode errorCode) throws IOException { + if (errorCode.spdyGoAwayCode == -1) throw new IllegalArgumentException(); int type = TYPE_GOAWAY; int flags = 0; int length = 8; out.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); out.writeInt((flags & 0xff) << 24 | length & 0xffffff); out.writeInt(lastGoodStreamId); - out.writeInt(statusCode); + out.writeInt(errorCode.spdyGoAwayCode); out.flush(); } diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java index b1ed817d3..16e662c28 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java @@ -55,10 +55,6 @@ public final class SpdyConnection implements Closeable { // operations must synchronize on 'this' last. This ensures that we never // wait for a blocking operation while holding 'this'. - static final int GOAWAY_OK = 0; - static final int GOAWAY_PROTOCOL_ERROR = 1; - static final int GOAWAY_INTERNAL_ERROR = 2; - private static final ExecutorService executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue(), Util.daemonThreadFactory("OkHttp SpdyConnection")); @@ -95,8 +91,8 @@ public final class SpdyConnection implements Closeable { variant = builder.variant; client = builder.client; handler = builder.handler; - frameReader = variant.newReader(builder.in); - frameWriter = variant.newWriter(builder.out); + frameReader = variant.newReader(builder.in, client); + frameWriter = variant.newWriter(builder.out, client); nextStreamId = builder.client ? 1 : 2; nextPingId = builder.client ? 1 : 2; @@ -189,18 +185,18 @@ public final class SpdyConnection implements Closeable { frameWriter.data(outFinished, streamId, buffer, offset, byteCount); } - void writeSynResetLater(final int streamId, final int statusCode) { + void writeSynResetLater(final int streamId, final ErrorCode errorCode) { executor.submit(new NamedRunnable("OkHttp SPDY Writer %s stream %d", hostName, streamId) { @Override public void execute() { try { - writeSynReset(streamId, statusCode); + writeSynReset(streamId, errorCode); } catch (IOException ignored) { } } }); } - void writeSynReset(int streamId, int statusCode) throws IOException { + void writeSynReset(int streamId, ErrorCode statusCode) throws IOException { frameWriter.rstStream(streamId, statusCode); } @@ -235,26 +231,28 @@ public final class SpdyConnection implements Closeable { if (pings == null) pings = new HashMap(); pings.put(pingId, ping); } - writePing(pingId, ping); + writePing(false, pingId, 0x4f4b6f6b /* ASCII "OKok" */, ping); return ping; } - private void writePingLater(final int streamId, final Ping ping) { - executor.submit(new NamedRunnable("OkHttp SPDY Writer %s ping %d", hostName, streamId) { + private void writePingLater( + final boolean reply, final int payload1, final int payload2, final Ping ping) { + executor.submit(new NamedRunnable("OkHttp SPDY Writer %s ping %08x%08x", + hostName, payload1, payload2) { @Override public void execute() { try { - writePing(streamId, ping); + writePing(reply, payload1, payload2, ping); } catch (IOException ignored) { } } }); } - private void writePing(int id, Ping ping) throws IOException { + private void writePing(boolean reply, int payload1, int payload2, Ping ping) throws IOException { synchronized (frameWriter) { // Observe the sent time immediately before performing I/O. if (ping != null) ping.send(); - frameWriter.ping(id); + frameWriter.ping(reply, payload1, payload2); } } @@ -276,11 +274,8 @@ public final class SpdyConnection implements Closeable { * locally, nor accepted from the remote peer. Existing streams are not * impacted. This is intended to permit an endpoint to gracefully stop * accepting new requests without harming previously established streams. - * - * @param statusCode one of {@link #GOAWAY_OK}, {@link - * #GOAWAY_INTERNAL_ERROR} or {@link #GOAWAY_PROTOCOL_ERROR}. */ - public void shutdown(int statusCode) throws IOException { + public void shutdown(ErrorCode statusCode) throws IOException { synchronized (frameWriter) { int lastGoodStreamId; synchronized (this) { @@ -300,14 +295,14 @@ public final class SpdyConnection implements Closeable { * internal executor services. */ @Override public void close() throws IOException { - close(GOAWAY_OK, SpdyStream.RST_CANCEL); + close(ErrorCode.NO_ERROR, ErrorCode.CANCEL); } - private void close(int shutdownStatusCode, int rstStatusCode) throws IOException { + private void close(ErrorCode connectionCode, ErrorCode streamCode) throws IOException { assert (!Thread.holdsLock(this)); IOException thrown = null; try { - shutdown(shutdownStatusCode); + shutdown(connectionCode); } catch (IOException e) { thrown = e; } @@ -329,7 +324,7 @@ public final class SpdyConnection implements Closeable { if (streamsToClose != null) { for (SpdyStream stream : streamsToClose) { try { - stream.close(rstStatusCode); + stream.close(streamCode); } catch (IOException e) { if (thrown != null) thrown = e; } @@ -421,19 +416,19 @@ public final class SpdyConnection implements Closeable { private class Reader implements Runnable, FrameReader.Handler { @Override public void run() { - int shutdownStatusCode = GOAWAY_INTERNAL_ERROR; - int rstStatusCode = SpdyStream.RST_INTERNAL_ERROR; + ErrorCode connectionErrorCode = ErrorCode.INTERNAL_ERROR; + ErrorCode streamErrorCode = ErrorCode.INTERNAL_ERROR; try { while (frameReader.nextFrame(this)) { } - shutdownStatusCode = GOAWAY_OK; - rstStatusCode = SpdyStream.RST_CANCEL; + connectionErrorCode = ErrorCode.NO_ERROR; + streamErrorCode = ErrorCode.CANCEL; } catch (IOException e) { - shutdownStatusCode = GOAWAY_PROTOCOL_ERROR; - rstStatusCode = SpdyStream.RST_PROTOCOL_ERROR; + connectionErrorCode = ErrorCode.PROTOCOL_ERROR; + streamErrorCode = ErrorCode.PROTOCOL_ERROR; } finally { try { - close(shutdownStatusCode, rstStatusCode); + close(connectionErrorCode, streamErrorCode); } catch (IOException ignored) { } } @@ -443,7 +438,7 @@ public final class SpdyConnection implements Closeable { throws IOException { SpdyStream dataStream = getStream(streamId); if (dataStream == null) { - writeSynResetLater(streamId, SpdyStream.RST_INVALID_STREAM); + writeSynResetLater(streamId, ErrorCode.INVALID_STREAM); Util.skipByReading(in, length); return; } @@ -467,7 +462,7 @@ public final class SpdyConnection implements Closeable { previous = streams.put(streamId, synStream); } if (previous != null) { - previous.closeLater(SpdyStream.RST_PROTOCOL_ERROR); + previous.closeLater(ErrorCode.PROTOCOL_ERROR); removeStream(streamId); return; } @@ -487,7 +482,7 @@ public final class SpdyConnection implements Closeable { throws IOException { SpdyStream replyStream = getStream(streamId); if (replyStream == null) { - writeSynResetLater(streamId, SpdyStream.RST_INVALID_STREAM); + writeSynResetLater(streamId, ErrorCode.INVALID_STREAM); return; } replyStream.receiveReply(nameValueBlock); @@ -504,10 +499,10 @@ public final class SpdyConnection implements Closeable { } } - @Override public void rstStream(int streamId, int statusCode) { + @Override public void rstStream(int streamId, ErrorCode errorCode) { SpdyStream rstStream = removeStream(streamId); if (rstStream != null) { - rstStream.receiveRstStream(statusCode); + rstStream.receiveRstStream(errorCode); } } @@ -528,6 +523,7 @@ public final class SpdyConnection implements Closeable { // The synchronization here is ugly. We need to synchronize on 'this' to guard // reads to 'settings'. We synchronize on 'stream' to guard the state change. // And we need to acquire the 'stream' lock first, since that may block. + // TODO: this can block the reader thread until a write completes. That's bad! synchronized (stream) { synchronized (SpdyConnection.this) { stream.receiveSettings(settings); @@ -540,19 +536,19 @@ public final class SpdyConnection implements Closeable { @Override public void noop() { } - @Override public void ping(int streamId) { - if (client != (streamId % 2 == 1)) { - // Respond to a client ping if this is a server and vice versa. - writePingLater(streamId, null); - } else { - Ping ping = removePing(streamId); + @Override public void ping(boolean reply, int payload1, int payload2) { + if (reply) { + Ping ping = removePing(payload1); if (ping != null) { ping.receive(); } + } else { + // Send a reply to a client ping if this is a server and vice versa. + writePingLater(true, payload1, payload2, null); } } - @Override public void goAway(int lastGoodStreamId, int statusCode) { + @Override public void goAway(int lastGoodStreamId, ErrorCode errorCode) { synchronized (SpdyConnection.this) { shutdown = true; @@ -562,18 +558,28 @@ public final class SpdyConnection implements Closeable { Map.Entry entry = i.next(); int streamId = entry.getKey(); if (streamId > lastGoodStreamId && entry.getValue().isLocallyInitiated()) { - entry.getValue().receiveRstStream(SpdyStream.RST_REFUSED_STREAM); + entry.getValue().receiveRstStream(ErrorCode.REFUSED_STREAM); i.remove(); } } } } - @Override public void windowUpdate(int streamId, int deltaWindowSize) { + @Override public void windowUpdate(int streamId, int deltaWindowSize, boolean endFlowControl) { + if (streamId == 0) { + // TODO: honor whole-stream flow control + return; + } + + // TODO: honor endFlowControl SpdyStream stream = getStream(streamId); if (stream != null) { stream.receiveWindowUpdate(deltaWindowSize); } } + + @Override public void priority(int streamId, int priority) { + // TODO: honor priority. + } } } diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyStream.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyStream.java index b5f255594..a0de22d71 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyStream.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyStream.java @@ -33,33 +33,6 @@ public final class SpdyStream { // Internal state is guarded by this. No long-running or potentially // blocking operations are performed while the lock is held. - private static final String[] STATUS_CODE_NAMES = { - null, - "PROTOCOL_ERROR", - "INVALID_STREAM", - "REFUSED_STREAM", - "UNSUPPORTED_VERSION", - "CANCEL", - "INTERNAL_ERROR", - "FLOW_CONTROL_ERROR", - "STREAM_IN_USE", - "STREAM_ALREADY_CLOSED", - "INVALID_CREDENTIALS", - "FRAME_TOO_LARGE" - }; - - public static final int RST_PROTOCOL_ERROR = 1; - public static final int RST_INVALID_STREAM = 2; - public static final int RST_REFUSED_STREAM = 3; - public static final int RST_UNSUPPORTED_VERSION = 4; - public static final int RST_CANCEL = 5; - public static final int RST_INTERNAL_ERROR = 6; - public static final int RST_FLOW_CONTROL_ERROR = 7; - public static final int RST_STREAM_IN_USE = 8; - public static final int RST_STREAM_ALREADY_CLOSED = 9; - public static final int RST_INVALID_CREDENTIALS = 10; - public static final int RST_FRAME_TOO_LARGE = 11; - /** * The number of unacknowledged bytes at which the input stream will send * the peer a {@code WINDOW_UPDATE} frame. Must be less than this client's @@ -89,7 +62,7 @@ public final class SpdyStream { * reasons to abnormally close this stream (such as both peers closing it * near-simultaneously) then this is the first reason known to this peer. */ - private int rstStatusCode = -1; + private ErrorCode errorCode = null; SpdyStream(int id, SpdyConnection connection, boolean outFinished, boolean inFinished, int priority, int slot, List requestHeaders, Settings settings) { @@ -117,7 +90,7 @@ public final class SpdyStream { * reports itself as not open. This is because input data is buffered. */ public synchronized boolean isOpen() { - if (rstStatusCode != -1) { + if (errorCode != null) { return false; } if ((in.finished || in.closed) && (out.finished || out.closed) && responseHeaders != null) { @@ -146,13 +119,13 @@ public final class SpdyStream { */ public synchronized List getResponseHeaders() throws IOException { try { - while (responseHeaders == null && rstStatusCode == -1) { + while (responseHeaders == null && errorCode == null) { wait(); } if (responseHeaders != null) { return responseHeaders; } - throw new IOException("stream was reset: " + rstStatusString()); + throw new IOException("stream was reset: " + errorCode); } catch (InterruptedException e) { InterruptedIOException rethrow = new InterruptedIOException(); rethrow.initCause(e); @@ -161,15 +134,11 @@ public final class SpdyStream { } /** - * Returns the reason why this stream was closed, or -1 if it closed - * normally or has not yet been closed. Valid reasons are {@link - * #RST_PROTOCOL_ERROR}, {@link #RST_INVALID_STREAM}, {@link - * #RST_REFUSED_STREAM}, {@link #RST_UNSUPPORTED_VERSION}, {@link - * #RST_CANCEL}, {@link #RST_INTERNAL_ERROR} and {@link - * #RST_FLOW_CONTROL_ERROR}. + * Returns the reason why this stream was closed, or null if it closed + * normally or has not yet been closed. */ - public synchronized int getRstStatusCode() { - return rstStatusCode; + public synchronized ErrorCode getErrorCode() { + return errorCode; } /** @@ -236,7 +205,7 @@ public final class SpdyStream { * Abnormally terminate this stream. This blocks until the {@code RST_STREAM} * frame has been transmitted. */ - public void close(int rstStatusCode) throws IOException { + public void close(ErrorCode rstStatusCode) throws IOException { if (!closeInternal(rstStatusCode)) { return; // Already closed. } @@ -247,24 +216,24 @@ public final class SpdyStream { * Abnormally terminate this stream. This enqueues a {@code RST_STREAM} * frame and returns immediately. */ - public void closeLater(int rstStatusCode) { - if (!closeInternal(rstStatusCode)) { + public void closeLater(ErrorCode errorCode) { + if (!closeInternal(errorCode)) { return; // Already closed. } - connection.writeSynResetLater(id, rstStatusCode); + connection.writeSynResetLater(id, errorCode); } /** Returns true if this stream was closed. */ - private boolean closeInternal(int rstStatusCode) { + private boolean closeInternal(ErrorCode errorCode) { assert (!Thread.holdsLock(this)); synchronized (this) { - if (this.rstStatusCode != -1) { + if (this.errorCode != null) { return false; } if (in.finished && out.finished) { return false; } - this.rstStatusCode = rstStatusCode; + this.errorCode = errorCode; notifyAll(); } connection.removeStream(id); @@ -285,7 +254,7 @@ public final class SpdyStream { } } if (streamInUseError) { - closeLater(SpdyStream.RST_STREAM_IN_USE); + closeLater(ErrorCode.STREAM_IN_USE); } else if (!open) { connection.removeStream(id); } @@ -305,7 +274,7 @@ public final class SpdyStream { } } if (protocolError) { - closeLater(SpdyStream.RST_PROTOCOL_ERROR); + closeLater(ErrorCode.PROTOCOL_ERROR); } } @@ -327,14 +296,16 @@ public final class SpdyStream { } } - synchronized void receiveRstStream(int statusCode) { - if (rstStatusCode == -1) { - rstStatusCode = statusCode; + synchronized void receiveRstStream(ErrorCode errorCode) { + if (this.errorCode == null) { + this.errorCode = errorCode; notifyAll(); } } private void setSettings(Settings settings) { + // TODO: For HTTP/2.0, also adjust the stream flow control window size + // by the difference between the new value and the old value. assert (Thread.holdsLock(connection)); // Because 'settings' is guarded by 'connection'. this.writeWindowSize = settings != null ? settings.getInitialWindowSize(Settings.DEFAULT_INITIAL_WINDOW_SIZE) @@ -352,11 +323,6 @@ public final class SpdyStream { notifyAll(); } - private String rstStatusString() { - return rstStatusCode > 0 && rstStatusCode < STATUS_CODE_NAMES.length - ? STATUS_CODE_NAMES[rstStatusCode] : Integer.toString(rstStatusCode); - } - int getPriority() { return priority; } @@ -484,7 +450,7 @@ public final class SpdyStream { remaining = readTimeoutMillis; } try { - while (pos == -1 && !finished && !closed && rstStatusCode == -1) { + while (pos == -1 && !finished && !closed && errorCode == null) { if (readTimeoutMillis == 0) { SpdyStream.this.wait(); } else if (remaining > 0) { @@ -522,7 +488,7 @@ public final class SpdyStream { // If the peer sends more data than we can handle, discard it and close the connection. if (flowControlError) { Util.skipByReading(in, byteCount); - closeLater(SpdyStream.RST_FLOW_CONTROL_ERROR); + closeLater(ErrorCode.FLOW_CONTROL_ERROR); return; } @@ -571,8 +537,8 @@ public final class SpdyStream { if (closed) { throw new IOException("stream closed"); } - if (rstStatusCode != -1) { - throw new IOException("stream was reset: " + rstStatusString()); + if (errorCode != null) { + throw new IOException("stream was reset: " + errorCode); } } } @@ -590,7 +556,7 @@ public final class SpdyStream { // is safe because the input stream is closed (we won't use any // further bytes) and the output stream is either finished or closed // (so RSTing both streams doesn't cause harm). - SpdyStream.this.close(RST_CANCEL); + SpdyStream.this.close(ErrorCode.CANCEL); } else if (!open) { connection.removeStream(id); } @@ -693,8 +659,8 @@ public final class SpdyStream { throw new IOException("stream closed"); } else if (finished) { throw new IOException("stream finished"); - } else if (rstStatusCode != -1) { - throw new IOException("stream was reset: " + rstStatusString()); + } else if (errorCode != null) { + throw new IOException("stream was reset: " + errorCode); } } } catch (InterruptedException e) { @@ -708,8 +674,8 @@ public final class SpdyStream { throw new IOException("stream closed"); } else if (finished) { throw new IOException("stream finished"); - } else if (rstStatusCode != -1) { - throw new IOException("stream was reset: " + rstStatusString()); + } else if (errorCode != null) { + throw new IOException("stream was reset: " + errorCode); } } } diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Variant.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Variant.java index 6f2a0f401..b60c70845 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Variant.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Variant.java @@ -23,6 +23,6 @@ interface Variant { Variant SPDY3 = new Spdy3(); Variant HTTP_20_DRAFT_04 = new Http20Draft04(); - FrameReader newReader(InputStream in); - FrameWriter newWriter(OutputStream out); + FrameReader newReader(InputStream in, boolean client); + FrameWriter newWriter(OutputStream out, boolean client); } diff --git a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/MockSpdyPeer.java b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/MockSpdyPeer.java index 765fa4579..82459ca5a 100644 --- a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/MockSpdyPeer.java +++ b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/MockSpdyPeer.java @@ -49,7 +49,7 @@ public final class MockSpdyPeer implements Closeable { public MockSpdyPeer(boolean client) { this.client = client; - this.frameWriter = Variant.SPDY3.newWriter(bytesOut); + this.frameWriter = Variant.SPDY3.newWriter(bytesOut, client); } public void acceptFrame() { @@ -100,7 +100,7 @@ public final class MockSpdyPeer implements Closeable { socket = serverSocket.accept(); OutputStream out = socket.getOutputStream(); InputStream in = socket.getInputStream(); - FrameReader reader = Variant.SPDY3.newReader(in); + FrameReader reader = Variant.SPDY3.newReader(in, client); Iterator outFramesIterator = outFrames.iterator(); byte[] outBytes = bytesOut.toByteArray(); @@ -168,7 +168,6 @@ public final class MockSpdyPeer implements Closeable { public final int sequence; public final FrameReader reader; public int type = -1; - public int flags = -1; public boolean clearPrevious; public boolean outFinished; public boolean inFinished; @@ -176,7 +175,7 @@ public final class MockSpdyPeer implements Closeable { public int associatedStreamId; public int priority; public int slot; - public int statusCode; + public ErrorCode errorCode; public int deltaWindowSize; public List nameValueBlock; public byte[] data; @@ -232,17 +231,17 @@ public final class MockSpdyPeer implements Closeable { Util.readFully(in, this.data); } - @Override public void rstStream(int streamId, int statusCode) { + @Override public void rstStream(int streamId, ErrorCode errorCode) { if (this.type != -1) throw new IllegalStateException(); this.type = Spdy3.TYPE_RST_STREAM; this.streamId = streamId; - this.statusCode = statusCode; + this.errorCode = errorCode; } - @Override public void ping(int streamId) { + @Override public void ping(boolean reply, int payload1, int payload2) { if (this.type != -1) throw new IllegalStateException(); this.type = Spdy3.TYPE_PING; - this.streamId = streamId; + this.streamId = payload1; } @Override public void noop() { @@ -250,18 +249,22 @@ public final class MockSpdyPeer implements Closeable { this.type = Spdy3.TYPE_NOOP; } - @Override public void goAway(int lastGoodStreamId, int statusCode) { + @Override public void goAway(int lastGoodStreamId, ErrorCode errorCode) { if (this.type != -1) throw new IllegalStateException(); this.type = Spdy3.TYPE_GOAWAY; this.streamId = lastGoodStreamId; - this.statusCode = statusCode; + this.errorCode = errorCode; } - @Override public void windowUpdate(int streamId, int deltaWindowSize) { + @Override public void windowUpdate(int streamId, int deltaWindowSize, boolean endFlowControl) { if (this.type != -1) throw new IllegalStateException(); this.type = Spdy3.TYPE_WINDOW_UPDATE; this.streamId = streamId; this.deltaWindowSize = deltaWindowSize; } + + @Override public void priority(int streamId, int priority) { + throw new UnsupportedOperationException(); + } } } \ No newline at end of file diff --git a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/SpdyConnectionTest.java b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/SpdyConnectionTest.java index c5d4ed8f1..45d6ac901 100644 --- a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/SpdyConnectionTest.java +++ b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/SpdyConnectionTest.java @@ -29,6 +29,13 @@ import org.junit.After; import org.junit.Test; import static com.squareup.okhttp.internal.Util.UTF_8; +import static com.squareup.okhttp.internal.spdy.ErrorCode.CANCEL; +import static com.squareup.okhttp.internal.spdy.ErrorCode.FLOW_CONTROL_ERROR; +import static com.squareup.okhttp.internal.spdy.ErrorCode.INTERNAL_ERROR; +import static com.squareup.okhttp.internal.spdy.ErrorCode.INVALID_STREAM; +import static com.squareup.okhttp.internal.spdy.ErrorCode.PROTOCOL_ERROR; +import static com.squareup.okhttp.internal.spdy.ErrorCode.REFUSED_STREAM; +import static com.squareup.okhttp.internal.spdy.ErrorCode.STREAM_IN_USE; import static com.squareup.okhttp.internal.spdy.Settings.PERSIST_VALUE; import static com.squareup.okhttp.internal.spdy.Spdy3.TYPE_DATA; import static com.squareup.okhttp.internal.spdy.Spdy3.TYPE_GOAWAY; @@ -38,13 +45,6 @@ import static com.squareup.okhttp.internal.spdy.Spdy3.TYPE_RST_STREAM; import static com.squareup.okhttp.internal.spdy.Spdy3.TYPE_SYN_REPLY; import static com.squareup.okhttp.internal.spdy.Spdy3.TYPE_SYN_STREAM; import static com.squareup.okhttp.internal.spdy.Spdy3.TYPE_WINDOW_UPDATE; -import static com.squareup.okhttp.internal.spdy.SpdyConnection.GOAWAY_INTERNAL_ERROR; -import static com.squareup.okhttp.internal.spdy.SpdyConnection.GOAWAY_PROTOCOL_ERROR; -import static com.squareup.okhttp.internal.spdy.SpdyStream.RST_FLOW_CONTROL_ERROR; -import static com.squareup.okhttp.internal.spdy.SpdyStream.RST_INVALID_STREAM; -import static com.squareup.okhttp.internal.spdy.SpdyStream.RST_PROTOCOL_ERROR; -import static com.squareup.okhttp.internal.spdy.SpdyStream.RST_REFUSED_STREAM; -import static com.squareup.okhttp.internal.spdy.SpdyStream.RST_STREAM_IN_USE; import static com.squareup.okhttp.internal.spdy.SpdyStream.WINDOW_UPDATE_THRESHOLD; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -108,7 +108,7 @@ public final class SpdyConnectionTest { peer.acceptFrame(); // SYN_STREAM peer.acceptFrame(); // PING peer.sendFrame().synReply(true, 1, Arrays.asList("a", "android")); - peer.sendFrame().ping(1); + peer.sendFrame().ping(true, 1, 0); peer.play(); // play it back @@ -137,7 +137,7 @@ public final class SpdyConnectionTest { @Override public void receive(SpdyStream stream) throws IOException { receiveCount.incrementAndGet(); assertEquals(Arrays.asList("a", "android"), stream.getRequestHeaders()); - assertEquals(-1, stream.getRstStatusCode()); + assertEquals(null, stream.getErrorCode()); assertEquals(5, stream.getPriority()); assertEquals(129, stream.getSlot()); stream.reply(Arrays.asList("b", "banana"), true); @@ -196,7 +196,7 @@ public final class SpdyConnectionTest { @Test public void serverPingsClient() throws Exception { // write the mocking script - peer.sendFrame().ping(2); + peer.sendFrame().ping(false, 2, 0); peer.acceptFrame(); // PING peer.play(); @@ -212,13 +212,13 @@ public final class SpdyConnectionTest { @Test public void clientPingsServer() throws Exception { // write the mocking script peer.acceptFrame(); // PING - peer.sendFrame().ping(1); + peer.sendFrame().ping(true, 1, 0); peer.play(); // play it back - SpdyConnection connection = - new SpdyConnection.Builder(true, peer.openSocket()).handler(REJECT_INCOMING_STREAMS) - .build(); + SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()) + .handler(REJECT_INCOMING_STREAMS) + .build(); Ping ping = connection.ping(); assertTrue(ping.roundTripTime() > 0); assertTrue(ping.roundTripTime() < TimeUnit.SECONDS.toNanos(1)); @@ -231,10 +231,10 @@ public final class SpdyConnectionTest { @Test public void unexpectedPingIsNotReturned() throws Exception { // write the mocking script - peer.sendFrame().ping(2); + peer.sendFrame().ping(false, 2, 0); peer.acceptFrame(); // PING - peer.sendFrame().ping(3); // This ping will not be returned. - peer.sendFrame().ping(4); + peer.sendFrame().ping(true, 3, 0); // This ping will not be returned. + peer.sendFrame().ping(false, 4, 0); peer.acceptFrame(); // PING peer.play(); @@ -253,14 +253,14 @@ public final class SpdyConnectionTest { Settings settings = new Settings(); settings.set(Settings.MAX_CONCURRENT_STREAMS, PERSIST_VALUE, 10); peer.sendFrame().settings(settings); - peer.sendFrame().ping(2); + peer.sendFrame().ping(false, 2, 0); peer.acceptFrame(); // PING peer.play(); // play it back - SpdyConnection connection = - new SpdyConnection.Builder(true, peer.openSocket()).handler(REJECT_INCOMING_STREAMS) - .build(); + SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()) + .handler(REJECT_INCOMING_STREAMS) + .build(); peer.takeFrame(); // Guarantees that the Settings frame has been processed. synchronized (connection) { @@ -280,7 +280,7 @@ public final class SpdyConnectionTest { settings2.set(Settings.DOWNLOAD_RETRANS_RATE, PERSIST_VALUE, 500); settings2.set(Settings.MAX_CONCURRENT_STREAMS, PERSIST_VALUE, 600); peer.sendFrame().settings(settings2); - peer.sendFrame().ping(2); + peer.sendFrame().ping(false, 2, 0); peer.acceptFrame(); peer.play(); @@ -306,7 +306,7 @@ public final class SpdyConnectionTest { // write the mocking script peer.sendFrame().data(true, 42, "bogus".getBytes("UTF-8")); peer.acceptFrame(); // RST_STREAM - peer.sendFrame().ping(2); + peer.sendFrame().ping(false, 2, 0); peer.acceptFrame(); // PING peer.play(); @@ -317,7 +317,7 @@ public final class SpdyConnectionTest { MockSpdyPeer.InFrame rstStream = peer.takeFrame(); assertEquals(TYPE_RST_STREAM, rstStream.type); assertEquals(42, rstStream.streamId); - assertEquals(RST_INVALID_STREAM, rstStream.statusCode); + assertEquals(INVALID_STREAM, rstStream.errorCode); MockSpdyPeer.InFrame ping = peer.takeFrame(); assertEquals(2, ping.streamId); } @@ -326,7 +326,7 @@ public final class SpdyConnectionTest { // write the mocking script peer.sendFrame().synReply(false, 42, Arrays.asList("a", "android")); peer.acceptFrame(); // RST_STREAM - peer.sendFrame().ping(2); + peer.sendFrame().ping(false, 2, 0); peer.acceptFrame(); // PING peer.play(); @@ -337,7 +337,7 @@ public final class SpdyConnectionTest { MockSpdyPeer.InFrame rstStream = peer.takeFrame(); assertEquals(TYPE_RST_STREAM, rstStream.type); assertEquals(42, rstStream.streamId); - assertEquals(RST_INVALID_STREAM, rstStream.statusCode); + assertEquals(INVALID_STREAM, rstStream.errorCode); MockSpdyPeer.InFrame ping = peer.takeFrame(); assertEquals(2, ping.streamId); } @@ -349,7 +349,7 @@ public final class SpdyConnectionTest { peer.acceptFrame(); // TYPE_DATA peer.acceptFrame(); // TYPE_DATA with FLAG_FIN peer.acceptFrame(); // PING - peer.sendFrame().ping(1); + peer.sendFrame().ping(true, 1, 0); peer.play(); // play it back @@ -391,9 +391,9 @@ public final class SpdyConnectionTest { @Test public void serverClosesClientOutputStream() throws Exception { // write the mocking script peer.acceptFrame(); // SYN_STREAM - peer.sendFrame().rstStream(1, SpdyStream.RST_CANCEL); + peer.sendFrame().rstStream(1, CANCEL); peer.acceptFrame(); // PING - peer.sendFrame().ping(1); + peer.sendFrame().ping(true, 1, 0); peer.acceptFrame(); // DATA peer.play(); @@ -467,7 +467,7 @@ public final class SpdyConnectionTest { assertFalse(synStream.outFinished); MockSpdyPeer.InFrame rstStream = peer.takeFrame(); assertEquals(TYPE_RST_STREAM, rstStream.type); - assertEquals(SpdyStream.RST_CANCEL, rstStream.statusCode); + assertEquals(CANCEL, rstStream.errorCode); } /** @@ -515,7 +515,7 @@ public final class SpdyConnectionTest { assertFalse(fin.outFinished); MockSpdyPeer.InFrame rstStream = peer.takeFrame(); assertEquals(TYPE_RST_STREAM, rstStream.type); - assertEquals(SpdyStream.RST_CANCEL, rstStream.statusCode); + assertEquals(CANCEL, rstStream.errorCode); } @Test public void serverClosesClientInputStream() throws Exception { @@ -547,7 +547,7 @@ public final class SpdyConnectionTest { peer.sendFrame().synReply(false, 1, Arrays.asList("a", "android")); peer.acceptFrame(); // PING peer.sendFrame().synReply(false, 1, Arrays.asList("b", "banana")); - peer.sendFrame().ping(1); + peer.sendFrame().ping(true, 1, 0); peer.acceptFrame(); // RST_STREAM peer.play(); @@ -571,7 +571,7 @@ public final class SpdyConnectionTest { MockSpdyPeer.InFrame rstStream = peer.takeFrame(); assertEquals(TYPE_RST_STREAM, rstStream.type); assertEquals(1, rstStream.streamId); - assertEquals(RST_STREAM_IN_USE, rstStream.statusCode); + assertEquals(STREAM_IN_USE, rstStream.errorCode); } @Test public void remoteDoubleSynStream() throws Exception { @@ -588,7 +588,7 @@ public final class SpdyConnectionTest { @Override public void receive(SpdyStream stream) throws IOException { receiveCount.incrementAndGet(); assertEquals(Arrays.asList("a", "android"), stream.getRequestHeaders()); - assertEquals(-1, stream.getRstStatusCode()); + assertEquals(null, stream.getErrorCode()); stream.reply(Arrays.asList("c", "cola"), true); } }; @@ -600,7 +600,7 @@ public final class SpdyConnectionTest { MockSpdyPeer.InFrame rstStream = peer.takeFrame(); assertEquals(TYPE_RST_STREAM, rstStream.type); assertEquals(2, rstStream.streamId); - assertEquals(RST_PROTOCOL_ERROR, rstStream.statusCode); + assertEquals(PROTOCOL_ERROR, rstStream.errorCode); assertEquals(1, receiveCount.intValue()); } @@ -610,7 +610,7 @@ public final class SpdyConnectionTest { peer.sendFrame().synReply(false, 1, Arrays.asList("a", "android")); peer.sendFrame().data(true, 1, "robot".getBytes("UTF-8")); peer.sendFrame().data(true, 1, "c3po".getBytes("UTF-8")); // Ignored. - peer.sendFrame().ping(2); // Ping just to make sure the stream was fastforwarded. + peer.sendFrame().ping(false, 2, 0); // Ping just to make sure the stream was fastforwarded. peer.acceptFrame(); // PING peer.play(); @@ -634,7 +634,7 @@ public final class SpdyConnectionTest { peer.sendFrame().synReply(false, 1, Arrays.asList("b", "banana")); peer.sendFrame().data(false, 1, new byte[64 * 1024 + 1]); peer.acceptFrame(); // RST_STREAM - peer.sendFrame().ping(2); // Ping just to make sure the stream was fastforwarded. + peer.sendFrame().ping(false, 2, 0); // Ping just to make sure the stream was fastforwarded. peer.acceptFrame(); // PING peer.play(); @@ -649,7 +649,7 @@ public final class SpdyConnectionTest { MockSpdyPeer.InFrame rstStream = peer.takeFrame(); assertEquals(TYPE_RST_STREAM, rstStream.type); assertEquals(1, rstStream.streamId); - assertEquals(RST_FLOW_CONTROL_ERROR, rstStream.statusCode); + assertEquals(FLOW_CONTROL_ERROR, rstStream.errorCode); MockSpdyPeer.InFrame ping = peer.takeFrame(); assertEquals(TYPE_PING, ping.type); assertEquals(2, ping.streamId); @@ -658,8 +658,8 @@ public final class SpdyConnectionTest { @Test public void remoteSendsRefusedStreamBeforeReplyHeaders() throws Exception { // write the mocking script peer.acceptFrame(); // SYN_STREAM - peer.sendFrame().rstStream(1, RST_REFUSED_STREAM); - peer.sendFrame().ping(2); + peer.sendFrame().rstStream(1, REFUSED_STREAM); + peer.sendFrame().ping(false, 2, 0); peer.acceptFrame(); // PING peer.play(); @@ -686,9 +686,9 @@ public final class SpdyConnectionTest { // write the mocking script peer.acceptFrame(); // SYN_STREAM 1 peer.acceptFrame(); // SYN_STREAM 3 - peer.sendFrame().goAway(1, GOAWAY_PROTOCOL_ERROR); + peer.sendFrame().goAway(1, PROTOCOL_ERROR); peer.acceptFrame(); // PING - peer.sendFrame().ping(1); + peer.sendFrame().ping(true, 1, 0); peer.acceptFrame(); // DATA STREAM 1 peer.play(); @@ -733,14 +733,14 @@ public final class SpdyConnectionTest { peer.acceptFrame(); // GOAWAY peer.acceptFrame(); // PING peer.sendFrame().synStream(false, false, 2, 0, 0, 0, Arrays.asList("b", "b")); // Should be ignored! - peer.sendFrame().ping(1); + peer.sendFrame().ping(true, 1, 0); peer.play(); // play it back SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); connection.newStream(Arrays.asList("a", "android"), true, true); Ping ping = connection.ping(); - connection.shutdown(GOAWAY_PROTOCOL_ERROR); + connection.shutdown(PROTOCOL_ERROR); assertEquals(1, connection.openStreamCount()); ping.roundTripTime(); // Prevent the peer from exiting prematurely. @@ -752,7 +752,7 @@ public final class SpdyConnectionTest { MockSpdyPeer.InFrame goaway = peer.takeFrame(); assertEquals(TYPE_GOAWAY, goaway.type); assertEquals(0, goaway.streamId); - assertEquals(GOAWAY_PROTOCOL_ERROR, goaway.statusCode); + assertEquals(PROTOCOL_ERROR, goaway.errorCode); } @Test public void noPingsAfterShutdown() throws Exception { @@ -762,7 +762,7 @@ public final class SpdyConnectionTest { // play it back SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); - connection.shutdown(GOAWAY_INTERNAL_ERROR); + connection.shutdown(INTERNAL_ERROR); try { connection.ping(); fail(); @@ -773,7 +773,7 @@ public final class SpdyConnectionTest { // verify the peer received what was expected MockSpdyPeer.InFrame goaway = peer.takeFrame(); assertEquals(TYPE_GOAWAY, goaway.type); - assertEquals(GOAWAY_INTERNAL_ERROR, goaway.statusCode); + assertEquals(INTERNAL_ERROR, goaway.errorCode); } @Test public void close() throws Exception { @@ -836,7 +836,7 @@ public final class SpdyConnectionTest { peer.acceptFrame(); // SYN_STREAM peer.sendFrame().synReply(false, 1, Arrays.asList("a", "android")); peer.acceptFrame(); // PING - peer.sendFrame().ping(1); + peer.sendFrame().ping(true, 1, 0); peer.play(); // play it back @@ -866,7 +866,7 @@ public final class SpdyConnectionTest { peer.acceptFrame(); // PING peer.sendFrame().synReply(false, 1, Arrays.asList("a", "android")); peer.sendFrame().headers(1, Arrays.asList("c", "c3po")); - peer.sendFrame().ping(1); + peer.sendFrame().ping(true, 1, 0); peer.play(); // play it back @@ -888,7 +888,7 @@ public final class SpdyConnectionTest { peer.acceptFrame(); // PING peer.sendFrame().headers(1, Arrays.asList("c", "c3po")); peer.acceptFrame(); // RST_STREAM - peer.sendFrame().ping(1); + peer.sendFrame().ping(true, 1, 0); peer.play(); // play it back @@ -909,7 +909,7 @@ public final class SpdyConnectionTest { assertEquals(TYPE_PING, ping.type); MockSpdyPeer.InFrame rstStream = peer.takeFrame(); assertEquals(TYPE_RST_STREAM, rstStream.type); - assertEquals(RST_PROTOCOL_ERROR, rstStream.statusCode); + assertEquals(PROTOCOL_ERROR, rstStream.errorCode); } @Test public void readSendsWindowUpdate() throws Exception { diff --git a/okhttp/src/main/java/com/squareup/okhttp/internal/http/SpdyTransport.java b/okhttp/src/main/java/com/squareup/okhttp/internal/http/SpdyTransport.java index f87d61913..fce58f474 100644 --- a/okhttp/src/main/java/com/squareup/okhttp/internal/http/SpdyTransport.java +++ b/okhttp/src/main/java/com/squareup/okhttp/internal/http/SpdyTransport.java @@ -16,6 +16,7 @@ package com.squareup.okhttp.internal.http; +import com.squareup.okhttp.internal.spdy.ErrorCode; import com.squareup.okhttp.internal.spdy.SpdyConnection; import com.squareup.okhttp.internal.spdy.SpdyStream; import java.io.IOException; @@ -84,7 +85,7 @@ public final class SpdyTransport implements Transport { InputStream responseBodyIn) { if (streamCanceled) { if (stream != null) { - stream.closeLater(SpdyStream.RST_CANCEL); + stream.closeLater(ErrorCode.CANCEL); return true; } else { // If stream is null, it either means that writeRequestHeaders wasn't called