1
0
mirror of https://github.com/square/okhttp.git synced 2026-01-25 16:01:38 +03:00

Read HTTP/2.0 frames and error codes.

This introduces a new ErrorCode class that identifies
codes for HTTP/2.0, SPDY/3 RST codes, and SPDY/3 GO_AWAY
codes.
This commit is contained in:
jwilson
2013-08-17 20:01:33 -04:00
parent 540a885043
commit 6e382aa9b8
14 changed files with 434 additions and 220 deletions

View File

@@ -262,6 +262,8 @@ public final class Util {
* buffer.
*/
public static long skipByReading(InputStream in, long byteCount) throws IOException {
if (byteCount == 0) return 0L;
// acquire the shared skip buffer.
byte[] buffer = skipBuffer.getAndSet(null);
if (buffer == null) {

View File

@@ -0,0 +1,67 @@
package com.squareup.okhttp.internal.spdy;
public enum ErrorCode {
/** Not an error! For SPDY stream resets, prefer null over NO_ERROR. */
NO_ERROR(0, -1, 0),
PROTOCOL_ERROR(1, 1, 1),
/** A subtype of PROTOCOL_ERROR used by SPDY. */
INVALID_STREAM(1, 2, -1),
/** A subtype of PROTOCOL_ERROR used by SPDY. */
UNSUPPORTED_VERSION(1, 4, -1),
/** A subtype of PROTOCOL_ERROR used by SPDY. */
STREAM_IN_USE(1, 8, -1),
/** A subtype of PROTOCOL_ERROR used by SPDY. */
STREAM_ALREADY_CLOSED(1, 9, -1),
INTERNAL_ERROR(2, 6, 2),
FLOW_CONTROL_ERROR(3, 7, -1),
STREAM_CLOSED(5, -1, -1),
FRAME_TOO_LARGE(6, 11, -1),
REFUSED_STREAM(7, 3, -1),
CANCEL(8, 5, -1),
COMPRESSION_ERROR(9, -1, -1),
INVALID_CREDENTIALS(-1, 10, -1);
public final int httpCode;
public final int spdyRstCode;
public final int spdyGoAwayCode;
private ErrorCode(int httpCode, int spdyRstCode, int spdyGoAwayCode) {
this.httpCode = httpCode;
this.spdyRstCode = spdyRstCode;
this.spdyGoAwayCode = spdyGoAwayCode;
}
public static ErrorCode fromSpdy3Rst(int code) {
for (ErrorCode errorCode : ErrorCode.values()) {
if (errorCode.spdyRstCode == code) return errorCode;
}
return null;
}
public static ErrorCode fromHttp2(int code) {
for (ErrorCode errorCode : ErrorCode.values()) {
if (errorCode.httpCode == code) return errorCode;
}
return null;
}
public static ErrorCode fromSpdyGoAway(int code) {
for (ErrorCode errorCode : ErrorCode.values()) {
if (errorCode.spdyGoAwayCode == code) return errorCode;
}
return null;
}
}

View File

@@ -31,11 +31,12 @@ public interface FrameReader extends Closeable {
int priority, int slot, List<String> nameValueBlock);
void synReply(boolean inFinished, int streamId, List<String> nameValueBlock) throws IOException;
void headers(int streamId, List<String> nameValueBlock) throws IOException;
void rstStream(int streamId, int statusCode);
void rstStream(int streamId, ErrorCode errorCode);
void settings(boolean clearPrevious, Settings settings);
void noop();
void ping(int streamId);
void goAway(int lastGoodStreamId, int statusCode);
void windowUpdate(int streamId, int deltaWindowSize);
void ping(boolean reply, int payload1, int payload2);
void goAway(int lastGoodStreamId, ErrorCode errorCode);
void windowUpdate(int streamId, int deltaWindowSize, boolean endFlowControl);
void priority(int streamId, int priority);
}
}

View File

@@ -31,13 +31,13 @@ public interface FrameWriter extends Closeable {
int priority, int slot, List<String> nameValueBlock) throws IOException;
void synReply(boolean outFinished, int streamId, List<String> nameValueBlock) throws IOException;
void headers(int streamId, List<String> nameValueBlock) throws IOException;
void rstStream(int streamId, int statusCode) throws IOException;
void rstStream(int streamId, ErrorCode errorCode) throws IOException;
void data(boolean outFinished, int streamId, byte[] data) throws IOException;
void data(boolean outFinished, int streamId, byte[] data, int offset, int byteCount)
throws IOException;
void settings(Settings settings) throws IOException;
void noop() throws IOException;
void ping(int id) throws IOException;
void goAway(int lastGoodStreamId, int statusCode) throws IOException;
void ping(boolean reply, int payload1, int payload2) throws IOException;
void goAway(int lastGoodStreamId, ErrorCode errorCode) throws IOException;
void windowUpdate(int streamId, int deltaWindowSize) throws IOException;
}

View File

@@ -15,6 +15,7 @@
*/
package com.squareup.okhttp.internal.spdy;
import com.squareup.okhttp.internal.Util;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -23,11 +24,27 @@ import java.io.OutputStream;
import java.util.List;
final class Http20Draft04 implements Variant {
@Override public FrameReader newReader(InputStream in) {
static final int TYPE_DATA = 0x0;
static final int TYPE_HEADERS = 0x1;
static final int TYPE_PRIORITY = 0x2;
static final int TYPE_RST_STREAM = 0x3;
static final int TYPE_SETTINGS = 0x4;
static final int TYPE_PUSH_PROMISE = 0x5;
static final int TYPE_PING = 0x6;
static final int TYPE_GOAWAY = 0x7;
static final int TYPE_WINDOW_UPDATE = 0x9;
static final int FLAG_END_STREAM = 0x1;
static final int FLAG_END_HEADERS = 0x4;
static final int FLAG_PRIORITY = 0x8;
static final int FLAG_PONG = 0x1;
static final int FLAG_END_FLOW_CONTROL = 0x1;
@Override public FrameReader newReader(InputStream in, boolean client) {
return new Reader(in);
}
@Override public FrameWriter newWriter(OutputStream out) {
@Override public FrameWriter newWriter(OutputStream out, boolean client) {
return new Writer(out);
}
@@ -50,12 +67,130 @@ final class Http20Draft04 implements Variant {
int length = w1 & 0xffff;
int type = (w1 & 0xff0000) >> 16;
int flags = (w1 & 0xff000000) >> 24;
boolean r = (w2 & 0x80000000) != 0;
// boolean r = (w2 & 0x80000000) != 0; // Reserved.
int streamId = (w2 & 0x7fffffff);
switch (type) {
case TYPE_DATA:
readData(handler, flags, length, streamId);
return true;
case TYPE_PRIORITY:
readPriority(handler, flags, length, streamId);
return true;
case TYPE_RST_STREAM:
readRstStream(handler, flags, length, streamId);
return true;
case TYPE_SETTINGS:
readSettings(handler, flags, length, streamId);
return true;
case TYPE_PUSH_PROMISE:
readPushPromise(handler, flags, length, streamId);
return true;
case TYPE_PING:
readPing(handler, flags, length, streamId);
return true;
case TYPE_GOAWAY:
readGoAway(handler, flags, length, streamId);
return true;
case TYPE_WINDOW_UPDATE:
readWindowUpdate(handler, flags, length, streamId);
return true;
}
throw new UnsupportedOperationException("TODO");
}
private void readData(Handler handler, int flags, int length, int streamId) throws IOException {
boolean inFinished = (flags & FLAG_END_STREAM) != 0;
handler.data(inFinished, streamId, in, length);
}
private void readPriority(Handler handler, int flags, int length, int streamId)
throws IOException {
if (length != 4) throw ioException("TYPE_PRIORITY length: %d != 4", length);
if (streamId == 0) throw ioException("TYPE_PRIORITY streamId == 0");
int w1 = in.readInt();
// boolean r = (w1 & 0x80000000) != 0; // Reserved.
int priority = (w1 & 0x7fffffff);
handler.priority(streamId, priority);
}
private void readRstStream(Handler handler, int flags, int length, int streamId)
throws IOException {
if (length != 4) throw ioException("TYPE_RST_STREAM length: %d != 4", length);
if (streamId == 0) throw ioException("TYPE_RST_STREAM streamId == 0");
int errorCodeInt = in.readInt();
ErrorCode errorCode = ErrorCode.fromHttp2(errorCodeInt);
if (errorCode == null) {
throw ioException("TYPE_RST_STREAM unexpected error code: %d", errorCodeInt);
}
handler.rstStream(streamId, errorCode);
}
private void readSettings(Handler handler, int flags, int length, int streamId)
throws IOException {
if (length % 8 != 0) throw ioException("TYPE_SETTINGS length %% 8 != 0: %s", length);
if (streamId != 0) throw ioException("TYPE_SETTINGS streamId != 0");
Settings settings = new Settings();
for (int i = 0; i < length; i += 8) {
int w1 = in.readInt();
int value = in.readInt();
// int r = (w1 & 0xff000000) >>> 24; // Reserved.
int id = w1 & 0xffffff;
settings.set(id, 0, value);
}
handler.settings(false, settings);
}
private void readPushPromise(Handler handler, int flags, int length, int streamId) {
// TODO:
}
private void readPing(Handler handler, int flags, int length, int streamId) throws IOException {
if (length != 8) throw ioException("TYPE_PING length != 8: %s", length);
if (streamId != 0) throw ioException("TYPE_PING streamId != 0");
int payload1 = in.readInt();
int payload2 = in.readInt();
boolean reply = (flags & FLAG_PONG) != 0;
handler.ping(reply, payload1, payload2);
}
private void readGoAway(Handler handler, int flags, int length, int streamId)
throws IOException {
if (length < 8) throw ioException("TYPE_GOAWAY length < 8: %s", length);
int lastStreamId = in.readInt();
int errorCodeInt = in.readInt();
int opaqueDataLength = length - 8;
ErrorCode errorCode = ErrorCode.fromHttp2(errorCodeInt);
if (errorCode == null) {
throw ioException("TYPE_RST_STREAM unexpected error code: %d", errorCodeInt);
}
if (Util.skipByReading(in, opaqueDataLength) != opaqueDataLength) {
throw new IOException("TYPE_GOAWAY opaque data was truncated");
}
handler.goAway(lastStreamId, errorCode);
}
private void readWindowUpdate(Handler handler, int flags, int length, int streamId)
throws IOException {
int w1 = in.readInt();
// boolean r = (w1 & 0x80000000) != 0; // Reserved.
int windowSizeIncrement = (w1 & 0x7fffffff);
boolean endFlowControl = (flags & FLAG_END_FLOW_CONTROL) != 0;
handler.windowUpdate(streamId, windowSizeIncrement, endFlowControl);
}
private static IOException ioException(String message, Object... args) throws IOException {
throw new IOException(String.format(message, args));
}
@Override public void close() throws IOException {
in.close();
}
@@ -92,17 +227,17 @@ final class Http20Draft04 implements Variant {
throw new UnsupportedOperationException("TODO");
}
@Override public synchronized void rstStream(int streamId, int statusCode) throws IOException {
@Override public synchronized void rstStream(int streamId, ErrorCode errorCode)
throws IOException {
throw new UnsupportedOperationException("TODO");
}
@Override public synchronized void data(boolean outFinished, int streamId, byte[] data)
throws IOException {
@Override public void data(boolean outFinished, int streamId, byte[] data) throws IOException {
data(outFinished, streamId, data, 0, data.length);
}
@Override public void data(boolean outFinished, int streamId, byte[] data, int offset,
int byteCount) throws IOException {
@Override public synchronized void data(boolean outFinished, int streamId, byte[] data,
int offset, int byteCount) throws IOException {
throw new UnsupportedOperationException("TODO");
}
@@ -114,11 +249,12 @@ final class Http20Draft04 implements Variant {
throw new UnsupportedOperationException("TODO");
}
@Override public synchronized void ping(int id) throws IOException {
@Override public synchronized void ping(boolean reply, int payload1, int payload2)
throws IOException {
throw new UnsupportedOperationException("TODO");
}
@Override public synchronized void goAway(int lastGoodStreamId, int statusCode)
@Override public synchronized void goAway(int lastGoodStreamId, ErrorCode errorCode)
throws IOException {
throw new UnsupportedOperationException("TODO");
}

View File

@@ -22,7 +22,7 @@ import java.io.IOException;
public interface IncomingStreamHandler {
IncomingStreamHandler REFUSE_INCOMING_STREAMS = new IncomingStreamHandler() {
@Override public void receive(SpdyStream stream) throws IOException {
stream.close(SpdyStream.RST_REFUSED_STREAM);
stream.close(ErrorCode.REFUSED_STREAM);
}
};

View File

@@ -31,23 +31,29 @@ final class Settings {
static final int PERSISTED = 0x2;
/** Sender's estimate of max incoming kbps. */
static final int UPLOAD_BANDWIDTH = 0x1;
static final int UPLOAD_BANDWIDTH = 1;
/** Sender's estimate of max outgoing kbps. */
static final int DOWNLOAD_BANDWIDTH = 0x2;
static final int DOWNLOAD_BANDWIDTH = 2;
/** Sender's estimate of milliseconds between sending a request and receiving a response. */
static final int ROUND_TRIP_TIME = 0x3;
static final int ROUND_TRIP_TIME = 3;
/** Sender's maximum number of concurrent streams. */
static final int MAX_CONCURRENT_STREAMS = 0x4;
static final int MAX_CONCURRENT_STREAMS = 4;
/** Current CWND in Packets. */
static final int CURRENT_CWND = 0x5;
static final int CURRENT_CWND = 5;
/** Retransmission rate. Percentage */
static final int DOWNLOAD_RETRANS_RATE = 0x6;
static final int DOWNLOAD_RETRANS_RATE = 6;
/** Window size in bytes. */
static final int INITIAL_WINDOW_SIZE = 0x7;
static final int INITIAL_WINDOW_SIZE = 7;
/** Window size in bytes. */
static final int CLIENT_CERTIFICATE_VECTOR_SIZE = 0x8;
static final int CLIENT_CERTIFICATE_VECTOR_SIZE = 8;
/** Flow control options. */
static final int FLOW_CONTROL_OPTIONS = 9;
/** Total number of settings. */
static final int COUNT = 0x9;
static final int COUNT = 10;
/** If set, flow control is disabled for streams directed to the sender of these settings. */
static final int FLOW_CONTROL_OPTIONS_DISABLED = 0x1;
/** Bitfield of which flags that values. */
private int set;
@@ -146,6 +152,13 @@ final class Settings {
return (bit & set) != 0 ? values[CLIENT_CERTIFICATE_VECTOR_SIZE] : defaultValue;
}
// TODO: honor this setting.
boolean isFlowControlDisabled() {
int bit = 1 << FLOW_CONTROL_OPTIONS;
int value = (bit & set) != 0 ? values[FLOW_CONTROL_OPTIONS] : 0;
return (value & FLOW_CONTROL_OPTIONS_DISABLED) != 0;
}
/**
* Returns true if this user agent should use this setting in future SPDY
* connections to the same host.

View File

@@ -92,23 +92,25 @@ final class Spdy3 implements Variant {
}
}
@Override public FrameReader newReader(InputStream in) {
return new Reader(in);
@Override public FrameReader newReader(InputStream in, boolean client) {
return new Reader(in, client);
}
@Override public FrameWriter newWriter(OutputStream out) {
return new Writer(out);
@Override public FrameWriter newWriter(OutputStream out, boolean client) {
return new Writer(out, client);
}
/** Read spdy/3 frames. */
static final class Reader implements FrameReader {
private final DataInputStream in;
private final DataInputStream nameValueBlockIn;
private final boolean client;
private int compressedLimit;
Reader(InputStream in) {
Reader(InputStream in, boolean client) {
this.in = new DataInputStream(in);
this.nameValueBlockIn = newNameValueBlockStream();
this.client = client;
}
/**
@@ -216,8 +218,12 @@ final class Spdy3 implements Variant {
private void readRstStream(Handler handler, int flags, int length) throws IOException {
if (length != 8) throw ioException("TYPE_RST_STREAM length: %d != 8", length);
int streamId = in.readInt() & 0x7fffffff;
int statusCode = in.readInt();
handler.rstStream(streamId, statusCode);
int errorCodeInt = in.readInt();
ErrorCode errorCode = ErrorCode.fromSpdy3Rst(errorCodeInt);
if (errorCode == null) {
throw ioException("TYPE_RST_STREAM unexpected error code: %d", errorCodeInt);
}
handler.rstStream(streamId, errorCode);
}
private void readHeaders(Handler handler, int flags, int length) throws IOException {
@@ -233,7 +239,7 @@ final class Spdy3 implements Variant {
int w2 = in.readInt();
int streamId = w1 & 0x7fffffff;
int deltaWindowSize = w2 & 0x7fffffff;
handler.windowUpdate(streamId, deltaWindowSize);
handler.windowUpdate(streamId, deltaWindowSize, false);
}
private DataInputStream newNameValueBlockStream() {
@@ -308,14 +314,19 @@ final class Spdy3 implements Variant {
private void readPing(Handler handler, int flags, int length) throws IOException {
if (length != 4) throw ioException("TYPE_PING length: %d != 4", length);
int id = in.readInt();
handler.ping(id);
boolean reply = client == ((id % 2) == 1);
handler.ping(reply, id, 0);
}
private void readGoAway(Handler handler, int flags, int length) throws IOException {
if (length != 8) throw ioException("TYPE_GOAWAY length: %d != 8", length);
int lastGoodStreamId = in.readInt() & 0x7fffffff;
int statusCode = in.readInt();
handler.goAway(lastGoodStreamId, statusCode);
int errorCodeInt = in.readInt();
ErrorCode errorCode = ErrorCode.fromSpdyGoAway(errorCodeInt);
if (errorCode == null) {
throw ioException("TYPE_GOAWAY unexpected error code: %d", errorCodeInt);
}
handler.goAway(lastGoodStreamId, errorCode);
}
private void readSettings(Handler handler, int flags, int length) throws IOException {
@@ -349,9 +360,11 @@ final class Spdy3 implements Variant {
private final DataOutputStream out;
private final ByteArrayOutputStream nameValueBlockBuffer;
private final DataOutputStream nameValueBlockOut;
private final boolean client;
Writer(OutputStream out) {
Writer(OutputStream out, boolean client) {
this.out = new DataOutputStream(out);
this.client = client;
Deflater deflater = new Deflater();
deflater.setDictionary(DICTIONARY);
@@ -360,7 +373,7 @@ final class Spdy3 implements Variant {
Platform.get().newDeflaterOutputStream(nameValueBlockBuffer, deflater, true));
}
@Override public void connectionHeader() {
@Override public synchronized void connectionHeader() {
// Do nothing: no connection header for SPDY/3.
}
@@ -414,14 +427,16 @@ final class Spdy3 implements Variant {
out.flush();
}
@Override public synchronized void rstStream(int streamId, int statusCode) throws IOException {
@Override public synchronized void rstStream(int streamId, ErrorCode errorCode)
throws IOException {
if (errorCode.spdyRstCode == -1) throw new IllegalArgumentException();
int flags = 0;
int type = TYPE_RST_STREAM;
int length = 8;
out.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff);
out.writeInt((flags & 0xff) << 24 | length & 0xffffff);
out.writeInt(streamId & 0x7fffffff);
out.writeInt(statusCode);
out.writeInt(errorCode.spdyRstCode);
out.flush();
}
@@ -475,25 +490,29 @@ final class Spdy3 implements Variant {
out.flush();
}
@Override public synchronized void ping(int id) throws IOException {
@Override public synchronized void ping(boolean reply, int payload1, int payload2)
throws IOException {
boolean payloadIsReply = client != ((payload1 % 2) == 1);
if (reply != payloadIsReply) throw new IllegalArgumentException("payload != reply");
int type = TYPE_PING;
int flags = 0;
int length = 4;
out.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff);
out.writeInt((flags & 0xff) << 24 | length & 0xffffff);
out.writeInt(id);
out.writeInt(payload1);
out.flush();
}
@Override public synchronized void goAway(int lastGoodStreamId, int statusCode)
@Override public synchronized void goAway(int lastGoodStreamId, ErrorCode errorCode)
throws IOException {
if (errorCode.spdyGoAwayCode == -1) throw new IllegalArgumentException();
int type = TYPE_GOAWAY;
int flags = 0;
int length = 8;
out.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff);
out.writeInt((flags & 0xff) << 24 | length & 0xffffff);
out.writeInt(lastGoodStreamId);
out.writeInt(statusCode);
out.writeInt(errorCode.spdyGoAwayCode);
out.flush();
}

View File

@@ -55,10 +55,6 @@ public final class SpdyConnection implements Closeable {
// operations must synchronize on 'this' last. This ensures that we never
// wait for a blocking operation while holding 'this'.
static final int GOAWAY_OK = 0;
static final int GOAWAY_PROTOCOL_ERROR = 1;
static final int GOAWAY_INTERNAL_ERROR = 2;
private static final ExecutorService executor = new ThreadPoolExecutor(0,
Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
Util.daemonThreadFactory("OkHttp SpdyConnection"));
@@ -95,8 +91,8 @@ public final class SpdyConnection implements Closeable {
variant = builder.variant;
client = builder.client;
handler = builder.handler;
frameReader = variant.newReader(builder.in);
frameWriter = variant.newWriter(builder.out);
frameReader = variant.newReader(builder.in, client);
frameWriter = variant.newWriter(builder.out, client);
nextStreamId = builder.client ? 1 : 2;
nextPingId = builder.client ? 1 : 2;
@@ -189,18 +185,18 @@ public final class SpdyConnection implements Closeable {
frameWriter.data(outFinished, streamId, buffer, offset, byteCount);
}
void writeSynResetLater(final int streamId, final int statusCode) {
void writeSynResetLater(final int streamId, final ErrorCode errorCode) {
executor.submit(new NamedRunnable("OkHttp SPDY Writer %s stream %d", hostName, streamId) {
@Override public void execute() {
try {
writeSynReset(streamId, statusCode);
writeSynReset(streamId, errorCode);
} catch (IOException ignored) {
}
}
});
}
void writeSynReset(int streamId, int statusCode) throws IOException {
void writeSynReset(int streamId, ErrorCode statusCode) throws IOException {
frameWriter.rstStream(streamId, statusCode);
}
@@ -235,26 +231,28 @@ public final class SpdyConnection implements Closeable {
if (pings == null) pings = new HashMap<Integer, Ping>();
pings.put(pingId, ping);
}
writePing(pingId, ping);
writePing(false, pingId, 0x4f4b6f6b /* ASCII "OKok" */, ping);
return ping;
}
private void writePingLater(final int streamId, final Ping ping) {
executor.submit(new NamedRunnable("OkHttp SPDY Writer %s ping %d", hostName, streamId) {
private void writePingLater(
final boolean reply, final int payload1, final int payload2, final Ping ping) {
executor.submit(new NamedRunnable("OkHttp SPDY Writer %s ping %08x%08x",
hostName, payload1, payload2) {
@Override public void execute() {
try {
writePing(streamId, ping);
writePing(reply, payload1, payload2, ping);
} catch (IOException ignored) {
}
}
});
}
private void writePing(int id, Ping ping) throws IOException {
private void writePing(boolean reply, int payload1, int payload2, Ping ping) throws IOException {
synchronized (frameWriter) {
// Observe the sent time immediately before performing I/O.
if (ping != null) ping.send();
frameWriter.ping(id);
frameWriter.ping(reply, payload1, payload2);
}
}
@@ -276,11 +274,8 @@ public final class SpdyConnection implements Closeable {
* locally, nor accepted from the remote peer. Existing streams are not
* impacted. This is intended to permit an endpoint to gracefully stop
* accepting new requests without harming previously established streams.
*
* @param statusCode one of {@link #GOAWAY_OK}, {@link
* #GOAWAY_INTERNAL_ERROR} or {@link #GOAWAY_PROTOCOL_ERROR}.
*/
public void shutdown(int statusCode) throws IOException {
public void shutdown(ErrorCode statusCode) throws IOException {
synchronized (frameWriter) {
int lastGoodStreamId;
synchronized (this) {
@@ -300,14 +295,14 @@ public final class SpdyConnection implements Closeable {
* internal executor services.
*/
@Override public void close() throws IOException {
close(GOAWAY_OK, SpdyStream.RST_CANCEL);
close(ErrorCode.NO_ERROR, ErrorCode.CANCEL);
}
private void close(int shutdownStatusCode, int rstStatusCode) throws IOException {
private void close(ErrorCode connectionCode, ErrorCode streamCode) throws IOException {
assert (!Thread.holdsLock(this));
IOException thrown = null;
try {
shutdown(shutdownStatusCode);
shutdown(connectionCode);
} catch (IOException e) {
thrown = e;
}
@@ -329,7 +324,7 @@ public final class SpdyConnection implements Closeable {
if (streamsToClose != null) {
for (SpdyStream stream : streamsToClose) {
try {
stream.close(rstStatusCode);
stream.close(streamCode);
} catch (IOException e) {
if (thrown != null) thrown = e;
}
@@ -421,19 +416,19 @@ public final class SpdyConnection implements Closeable {
private class Reader implements Runnable, FrameReader.Handler {
@Override public void run() {
int shutdownStatusCode = GOAWAY_INTERNAL_ERROR;
int rstStatusCode = SpdyStream.RST_INTERNAL_ERROR;
ErrorCode connectionErrorCode = ErrorCode.INTERNAL_ERROR;
ErrorCode streamErrorCode = ErrorCode.INTERNAL_ERROR;
try {
while (frameReader.nextFrame(this)) {
}
shutdownStatusCode = GOAWAY_OK;
rstStatusCode = SpdyStream.RST_CANCEL;
connectionErrorCode = ErrorCode.NO_ERROR;
streamErrorCode = ErrorCode.CANCEL;
} catch (IOException e) {
shutdownStatusCode = GOAWAY_PROTOCOL_ERROR;
rstStatusCode = SpdyStream.RST_PROTOCOL_ERROR;
connectionErrorCode = ErrorCode.PROTOCOL_ERROR;
streamErrorCode = ErrorCode.PROTOCOL_ERROR;
} finally {
try {
close(shutdownStatusCode, rstStatusCode);
close(connectionErrorCode, streamErrorCode);
} catch (IOException ignored) {
}
}
@@ -443,7 +438,7 @@ public final class SpdyConnection implements Closeable {
throws IOException {
SpdyStream dataStream = getStream(streamId);
if (dataStream == null) {
writeSynResetLater(streamId, SpdyStream.RST_INVALID_STREAM);
writeSynResetLater(streamId, ErrorCode.INVALID_STREAM);
Util.skipByReading(in, length);
return;
}
@@ -467,7 +462,7 @@ public final class SpdyConnection implements Closeable {
previous = streams.put(streamId, synStream);
}
if (previous != null) {
previous.closeLater(SpdyStream.RST_PROTOCOL_ERROR);
previous.closeLater(ErrorCode.PROTOCOL_ERROR);
removeStream(streamId);
return;
}
@@ -487,7 +482,7 @@ public final class SpdyConnection implements Closeable {
throws IOException {
SpdyStream replyStream = getStream(streamId);
if (replyStream == null) {
writeSynResetLater(streamId, SpdyStream.RST_INVALID_STREAM);
writeSynResetLater(streamId, ErrorCode.INVALID_STREAM);
return;
}
replyStream.receiveReply(nameValueBlock);
@@ -504,10 +499,10 @@ public final class SpdyConnection implements Closeable {
}
}
@Override public void rstStream(int streamId, int statusCode) {
@Override public void rstStream(int streamId, ErrorCode errorCode) {
SpdyStream rstStream = removeStream(streamId);
if (rstStream != null) {
rstStream.receiveRstStream(statusCode);
rstStream.receiveRstStream(errorCode);
}
}
@@ -528,6 +523,7 @@ public final class SpdyConnection implements Closeable {
// The synchronization here is ugly. We need to synchronize on 'this' to guard
// reads to 'settings'. We synchronize on 'stream' to guard the state change.
// And we need to acquire the 'stream' lock first, since that may block.
// TODO: this can block the reader thread until a write completes. That's bad!
synchronized (stream) {
synchronized (SpdyConnection.this) {
stream.receiveSettings(settings);
@@ -540,19 +536,19 @@ public final class SpdyConnection implements Closeable {
@Override public void noop() {
}
@Override public void ping(int streamId) {
if (client != (streamId % 2 == 1)) {
// Respond to a client ping if this is a server and vice versa.
writePingLater(streamId, null);
} else {
Ping ping = removePing(streamId);
@Override public void ping(boolean reply, int payload1, int payload2) {
if (reply) {
Ping ping = removePing(payload1);
if (ping != null) {
ping.receive();
}
} else {
// Send a reply to a client ping if this is a server and vice versa.
writePingLater(true, payload1, payload2, null);
}
}
@Override public void goAway(int lastGoodStreamId, int statusCode) {
@Override public void goAway(int lastGoodStreamId, ErrorCode errorCode) {
synchronized (SpdyConnection.this) {
shutdown = true;
@@ -562,18 +558,28 @@ public final class SpdyConnection implements Closeable {
Map.Entry<Integer, SpdyStream> entry = i.next();
int streamId = entry.getKey();
if (streamId > lastGoodStreamId && entry.getValue().isLocallyInitiated()) {
entry.getValue().receiveRstStream(SpdyStream.RST_REFUSED_STREAM);
entry.getValue().receiveRstStream(ErrorCode.REFUSED_STREAM);
i.remove();
}
}
}
}
@Override public void windowUpdate(int streamId, int deltaWindowSize) {
@Override public void windowUpdate(int streamId, int deltaWindowSize, boolean endFlowControl) {
if (streamId == 0) {
// TODO: honor whole-stream flow control
return;
}
// TODO: honor endFlowControl
SpdyStream stream = getStream(streamId);
if (stream != null) {
stream.receiveWindowUpdate(deltaWindowSize);
}
}
@Override public void priority(int streamId, int priority) {
// TODO: honor priority.
}
}
}

View File

@@ -33,33 +33,6 @@ public final class SpdyStream {
// Internal state is guarded by this. No long-running or potentially
// blocking operations are performed while the lock is held.
private static final String[] STATUS_CODE_NAMES = {
null,
"PROTOCOL_ERROR",
"INVALID_STREAM",
"REFUSED_STREAM",
"UNSUPPORTED_VERSION",
"CANCEL",
"INTERNAL_ERROR",
"FLOW_CONTROL_ERROR",
"STREAM_IN_USE",
"STREAM_ALREADY_CLOSED",
"INVALID_CREDENTIALS",
"FRAME_TOO_LARGE"
};
public static final int RST_PROTOCOL_ERROR = 1;
public static final int RST_INVALID_STREAM = 2;
public static final int RST_REFUSED_STREAM = 3;
public static final int RST_UNSUPPORTED_VERSION = 4;
public static final int RST_CANCEL = 5;
public static final int RST_INTERNAL_ERROR = 6;
public static final int RST_FLOW_CONTROL_ERROR = 7;
public static final int RST_STREAM_IN_USE = 8;
public static final int RST_STREAM_ALREADY_CLOSED = 9;
public static final int RST_INVALID_CREDENTIALS = 10;
public static final int RST_FRAME_TOO_LARGE = 11;
/**
* The number of unacknowledged bytes at which the input stream will send
* the peer a {@code WINDOW_UPDATE} frame. Must be less than this client's
@@ -89,7 +62,7 @@ public final class SpdyStream {
* reasons to abnormally close this stream (such as both peers closing it
* near-simultaneously) then this is the first reason known to this peer.
*/
private int rstStatusCode = -1;
private ErrorCode errorCode = null;
SpdyStream(int id, SpdyConnection connection, boolean outFinished, boolean inFinished,
int priority, int slot, List<String> requestHeaders, Settings settings) {
@@ -117,7 +90,7 @@ public final class SpdyStream {
* reports itself as not open. This is because input data is buffered.
*/
public synchronized boolean isOpen() {
if (rstStatusCode != -1) {
if (errorCode != null) {
return false;
}
if ((in.finished || in.closed) && (out.finished || out.closed) && responseHeaders != null) {
@@ -146,13 +119,13 @@ public final class SpdyStream {
*/
public synchronized List<String> getResponseHeaders() throws IOException {
try {
while (responseHeaders == null && rstStatusCode == -1) {
while (responseHeaders == null && errorCode == null) {
wait();
}
if (responseHeaders != null) {
return responseHeaders;
}
throw new IOException("stream was reset: " + rstStatusString());
throw new IOException("stream was reset: " + errorCode);
} catch (InterruptedException e) {
InterruptedIOException rethrow = new InterruptedIOException();
rethrow.initCause(e);
@@ -161,15 +134,11 @@ public final class SpdyStream {
}
/**
* Returns the reason why this stream was closed, or -1 if it closed
* normally or has not yet been closed. Valid reasons are {@link
* #RST_PROTOCOL_ERROR}, {@link #RST_INVALID_STREAM}, {@link
* #RST_REFUSED_STREAM}, {@link #RST_UNSUPPORTED_VERSION}, {@link
* #RST_CANCEL}, {@link #RST_INTERNAL_ERROR} and {@link
* #RST_FLOW_CONTROL_ERROR}.
* Returns the reason why this stream was closed, or null if it closed
* normally or has not yet been closed.
*/
public synchronized int getRstStatusCode() {
return rstStatusCode;
public synchronized ErrorCode getErrorCode() {
return errorCode;
}
/**
@@ -236,7 +205,7 @@ public final class SpdyStream {
* Abnormally terminate this stream. This blocks until the {@code RST_STREAM}
* frame has been transmitted.
*/
public void close(int rstStatusCode) throws IOException {
public void close(ErrorCode rstStatusCode) throws IOException {
if (!closeInternal(rstStatusCode)) {
return; // Already closed.
}
@@ -247,24 +216,24 @@ public final class SpdyStream {
* Abnormally terminate this stream. This enqueues a {@code RST_STREAM}
* frame and returns immediately.
*/
public void closeLater(int rstStatusCode) {
if (!closeInternal(rstStatusCode)) {
public void closeLater(ErrorCode errorCode) {
if (!closeInternal(errorCode)) {
return; // Already closed.
}
connection.writeSynResetLater(id, rstStatusCode);
connection.writeSynResetLater(id, errorCode);
}
/** Returns true if this stream was closed. */
private boolean closeInternal(int rstStatusCode) {
private boolean closeInternal(ErrorCode errorCode) {
assert (!Thread.holdsLock(this));
synchronized (this) {
if (this.rstStatusCode != -1) {
if (this.errorCode != null) {
return false;
}
if (in.finished && out.finished) {
return false;
}
this.rstStatusCode = rstStatusCode;
this.errorCode = errorCode;
notifyAll();
}
connection.removeStream(id);
@@ -285,7 +254,7 @@ public final class SpdyStream {
}
}
if (streamInUseError) {
closeLater(SpdyStream.RST_STREAM_IN_USE);
closeLater(ErrorCode.STREAM_IN_USE);
} else if (!open) {
connection.removeStream(id);
}
@@ -305,7 +274,7 @@ public final class SpdyStream {
}
}
if (protocolError) {
closeLater(SpdyStream.RST_PROTOCOL_ERROR);
closeLater(ErrorCode.PROTOCOL_ERROR);
}
}
@@ -327,14 +296,16 @@ public final class SpdyStream {
}
}
synchronized void receiveRstStream(int statusCode) {
if (rstStatusCode == -1) {
rstStatusCode = statusCode;
synchronized void receiveRstStream(ErrorCode errorCode) {
if (this.errorCode == null) {
this.errorCode = errorCode;
notifyAll();
}
}
private void setSettings(Settings settings) {
// TODO: For HTTP/2.0, also adjust the stream flow control window size
// by the difference between the new value and the old value.
assert (Thread.holdsLock(connection)); // Because 'settings' is guarded by 'connection'.
this.writeWindowSize = settings != null
? settings.getInitialWindowSize(Settings.DEFAULT_INITIAL_WINDOW_SIZE)
@@ -352,11 +323,6 @@ public final class SpdyStream {
notifyAll();
}
private String rstStatusString() {
return rstStatusCode > 0 && rstStatusCode < STATUS_CODE_NAMES.length
? STATUS_CODE_NAMES[rstStatusCode] : Integer.toString(rstStatusCode);
}
int getPriority() {
return priority;
}
@@ -484,7 +450,7 @@ public final class SpdyStream {
remaining = readTimeoutMillis;
}
try {
while (pos == -1 && !finished && !closed && rstStatusCode == -1) {
while (pos == -1 && !finished && !closed && errorCode == null) {
if (readTimeoutMillis == 0) {
SpdyStream.this.wait();
} else if (remaining > 0) {
@@ -522,7 +488,7 @@ public final class SpdyStream {
// If the peer sends more data than we can handle, discard it and close the connection.
if (flowControlError) {
Util.skipByReading(in, byteCount);
closeLater(SpdyStream.RST_FLOW_CONTROL_ERROR);
closeLater(ErrorCode.FLOW_CONTROL_ERROR);
return;
}
@@ -571,8 +537,8 @@ public final class SpdyStream {
if (closed) {
throw new IOException("stream closed");
}
if (rstStatusCode != -1) {
throw new IOException("stream was reset: " + rstStatusString());
if (errorCode != null) {
throw new IOException("stream was reset: " + errorCode);
}
}
}
@@ -590,7 +556,7 @@ public final class SpdyStream {
// is safe because the input stream is closed (we won't use any
// further bytes) and the output stream is either finished or closed
// (so RSTing both streams doesn't cause harm).
SpdyStream.this.close(RST_CANCEL);
SpdyStream.this.close(ErrorCode.CANCEL);
} else if (!open) {
connection.removeStream(id);
}
@@ -693,8 +659,8 @@ public final class SpdyStream {
throw new IOException("stream closed");
} else if (finished) {
throw new IOException("stream finished");
} else if (rstStatusCode != -1) {
throw new IOException("stream was reset: " + rstStatusString());
} else if (errorCode != null) {
throw new IOException("stream was reset: " + errorCode);
}
}
} catch (InterruptedException e) {
@@ -708,8 +674,8 @@ public final class SpdyStream {
throw new IOException("stream closed");
} else if (finished) {
throw new IOException("stream finished");
} else if (rstStatusCode != -1) {
throw new IOException("stream was reset: " + rstStatusString());
} else if (errorCode != null) {
throw new IOException("stream was reset: " + errorCode);
}
}
}

View File

@@ -23,6 +23,6 @@ interface Variant {
Variant SPDY3 = new Spdy3();
Variant HTTP_20_DRAFT_04 = new Http20Draft04();
FrameReader newReader(InputStream in);
FrameWriter newWriter(OutputStream out);
FrameReader newReader(InputStream in, boolean client);
FrameWriter newWriter(OutputStream out, boolean client);
}

View File

@@ -49,7 +49,7 @@ public final class MockSpdyPeer implements Closeable {
public MockSpdyPeer(boolean client) {
this.client = client;
this.frameWriter = Variant.SPDY3.newWriter(bytesOut);
this.frameWriter = Variant.SPDY3.newWriter(bytesOut, client);
}
public void acceptFrame() {
@@ -100,7 +100,7 @@ public final class MockSpdyPeer implements Closeable {
socket = serverSocket.accept();
OutputStream out = socket.getOutputStream();
InputStream in = socket.getInputStream();
FrameReader reader = Variant.SPDY3.newReader(in);
FrameReader reader = Variant.SPDY3.newReader(in, client);
Iterator<OutFrame> outFramesIterator = outFrames.iterator();
byte[] outBytes = bytesOut.toByteArray();
@@ -168,7 +168,6 @@ public final class MockSpdyPeer implements Closeable {
public final int sequence;
public final FrameReader reader;
public int type = -1;
public int flags = -1;
public boolean clearPrevious;
public boolean outFinished;
public boolean inFinished;
@@ -176,7 +175,7 @@ public final class MockSpdyPeer implements Closeable {
public int associatedStreamId;
public int priority;
public int slot;
public int statusCode;
public ErrorCode errorCode;
public int deltaWindowSize;
public List<String> nameValueBlock;
public byte[] data;
@@ -232,17 +231,17 @@ public final class MockSpdyPeer implements Closeable {
Util.readFully(in, this.data);
}
@Override public void rstStream(int streamId, int statusCode) {
@Override public void rstStream(int streamId, ErrorCode errorCode) {
if (this.type != -1) throw new IllegalStateException();
this.type = Spdy3.TYPE_RST_STREAM;
this.streamId = streamId;
this.statusCode = statusCode;
this.errorCode = errorCode;
}
@Override public void ping(int streamId) {
@Override public void ping(boolean reply, int payload1, int payload2) {
if (this.type != -1) throw new IllegalStateException();
this.type = Spdy3.TYPE_PING;
this.streamId = streamId;
this.streamId = payload1;
}
@Override public void noop() {
@@ -250,18 +249,22 @@ public final class MockSpdyPeer implements Closeable {
this.type = Spdy3.TYPE_NOOP;
}
@Override public void goAway(int lastGoodStreamId, int statusCode) {
@Override public void goAway(int lastGoodStreamId, ErrorCode errorCode) {
if (this.type != -1) throw new IllegalStateException();
this.type = Spdy3.TYPE_GOAWAY;
this.streamId = lastGoodStreamId;
this.statusCode = statusCode;
this.errorCode = errorCode;
}
@Override public void windowUpdate(int streamId, int deltaWindowSize) {
@Override public void windowUpdate(int streamId, int deltaWindowSize, boolean endFlowControl) {
if (this.type != -1) throw new IllegalStateException();
this.type = Spdy3.TYPE_WINDOW_UPDATE;
this.streamId = streamId;
this.deltaWindowSize = deltaWindowSize;
}
@Override public void priority(int streamId, int priority) {
throw new UnsupportedOperationException();
}
}
}

View File

@@ -29,6 +29,13 @@ import org.junit.After;
import org.junit.Test;
import static com.squareup.okhttp.internal.Util.UTF_8;
import static com.squareup.okhttp.internal.spdy.ErrorCode.CANCEL;
import static com.squareup.okhttp.internal.spdy.ErrorCode.FLOW_CONTROL_ERROR;
import static com.squareup.okhttp.internal.spdy.ErrorCode.INTERNAL_ERROR;
import static com.squareup.okhttp.internal.spdy.ErrorCode.INVALID_STREAM;
import static com.squareup.okhttp.internal.spdy.ErrorCode.PROTOCOL_ERROR;
import static com.squareup.okhttp.internal.spdy.ErrorCode.REFUSED_STREAM;
import static com.squareup.okhttp.internal.spdy.ErrorCode.STREAM_IN_USE;
import static com.squareup.okhttp.internal.spdy.Settings.PERSIST_VALUE;
import static com.squareup.okhttp.internal.spdy.Spdy3.TYPE_DATA;
import static com.squareup.okhttp.internal.spdy.Spdy3.TYPE_GOAWAY;
@@ -38,13 +45,6 @@ import static com.squareup.okhttp.internal.spdy.Spdy3.TYPE_RST_STREAM;
import static com.squareup.okhttp.internal.spdy.Spdy3.TYPE_SYN_REPLY;
import static com.squareup.okhttp.internal.spdy.Spdy3.TYPE_SYN_STREAM;
import static com.squareup.okhttp.internal.spdy.Spdy3.TYPE_WINDOW_UPDATE;
import static com.squareup.okhttp.internal.spdy.SpdyConnection.GOAWAY_INTERNAL_ERROR;
import static com.squareup.okhttp.internal.spdy.SpdyConnection.GOAWAY_PROTOCOL_ERROR;
import static com.squareup.okhttp.internal.spdy.SpdyStream.RST_FLOW_CONTROL_ERROR;
import static com.squareup.okhttp.internal.spdy.SpdyStream.RST_INVALID_STREAM;
import static com.squareup.okhttp.internal.spdy.SpdyStream.RST_PROTOCOL_ERROR;
import static com.squareup.okhttp.internal.spdy.SpdyStream.RST_REFUSED_STREAM;
import static com.squareup.okhttp.internal.spdy.SpdyStream.RST_STREAM_IN_USE;
import static com.squareup.okhttp.internal.spdy.SpdyStream.WINDOW_UPDATE_THRESHOLD;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -108,7 +108,7 @@ public final class SpdyConnectionTest {
peer.acceptFrame(); // SYN_STREAM
peer.acceptFrame(); // PING
peer.sendFrame().synReply(true, 1, Arrays.asList("a", "android"));
peer.sendFrame().ping(1);
peer.sendFrame().ping(true, 1, 0);
peer.play();
// play it back
@@ -137,7 +137,7 @@ public final class SpdyConnectionTest {
@Override public void receive(SpdyStream stream) throws IOException {
receiveCount.incrementAndGet();
assertEquals(Arrays.asList("a", "android"), stream.getRequestHeaders());
assertEquals(-1, stream.getRstStatusCode());
assertEquals(null, stream.getErrorCode());
assertEquals(5, stream.getPriority());
assertEquals(129, stream.getSlot());
stream.reply(Arrays.asList("b", "banana"), true);
@@ -196,7 +196,7 @@ public final class SpdyConnectionTest {
@Test public void serverPingsClient() throws Exception {
// write the mocking script
peer.sendFrame().ping(2);
peer.sendFrame().ping(false, 2, 0);
peer.acceptFrame(); // PING
peer.play();
@@ -212,13 +212,13 @@ public final class SpdyConnectionTest {
@Test public void clientPingsServer() throws Exception {
// write the mocking script
peer.acceptFrame(); // PING
peer.sendFrame().ping(1);
peer.sendFrame().ping(true, 1, 0);
peer.play();
// play it back
SpdyConnection connection =
new SpdyConnection.Builder(true, peer.openSocket()).handler(REJECT_INCOMING_STREAMS)
.build();
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket())
.handler(REJECT_INCOMING_STREAMS)
.build();
Ping ping = connection.ping();
assertTrue(ping.roundTripTime() > 0);
assertTrue(ping.roundTripTime() < TimeUnit.SECONDS.toNanos(1));
@@ -231,10 +231,10 @@ public final class SpdyConnectionTest {
@Test public void unexpectedPingIsNotReturned() throws Exception {
// write the mocking script
peer.sendFrame().ping(2);
peer.sendFrame().ping(false, 2, 0);
peer.acceptFrame(); // PING
peer.sendFrame().ping(3); // This ping will not be returned.
peer.sendFrame().ping(4);
peer.sendFrame().ping(true, 3, 0); // This ping will not be returned.
peer.sendFrame().ping(false, 4, 0);
peer.acceptFrame(); // PING
peer.play();
@@ -253,14 +253,14 @@ public final class SpdyConnectionTest {
Settings settings = new Settings();
settings.set(Settings.MAX_CONCURRENT_STREAMS, PERSIST_VALUE, 10);
peer.sendFrame().settings(settings);
peer.sendFrame().ping(2);
peer.sendFrame().ping(false, 2, 0);
peer.acceptFrame(); // PING
peer.play();
// play it back
SpdyConnection connection =
new SpdyConnection.Builder(true, peer.openSocket()).handler(REJECT_INCOMING_STREAMS)
.build();
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket())
.handler(REJECT_INCOMING_STREAMS)
.build();
peer.takeFrame(); // Guarantees that the Settings frame has been processed.
synchronized (connection) {
@@ -280,7 +280,7 @@ public final class SpdyConnectionTest {
settings2.set(Settings.DOWNLOAD_RETRANS_RATE, PERSIST_VALUE, 500);
settings2.set(Settings.MAX_CONCURRENT_STREAMS, PERSIST_VALUE, 600);
peer.sendFrame().settings(settings2);
peer.sendFrame().ping(2);
peer.sendFrame().ping(false, 2, 0);
peer.acceptFrame();
peer.play();
@@ -306,7 +306,7 @@ public final class SpdyConnectionTest {
// write the mocking script
peer.sendFrame().data(true, 42, "bogus".getBytes("UTF-8"));
peer.acceptFrame(); // RST_STREAM
peer.sendFrame().ping(2);
peer.sendFrame().ping(false, 2, 0);
peer.acceptFrame(); // PING
peer.play();
@@ -317,7 +317,7 @@ public final class SpdyConnectionTest {
MockSpdyPeer.InFrame rstStream = peer.takeFrame();
assertEquals(TYPE_RST_STREAM, rstStream.type);
assertEquals(42, rstStream.streamId);
assertEquals(RST_INVALID_STREAM, rstStream.statusCode);
assertEquals(INVALID_STREAM, rstStream.errorCode);
MockSpdyPeer.InFrame ping = peer.takeFrame();
assertEquals(2, ping.streamId);
}
@@ -326,7 +326,7 @@ public final class SpdyConnectionTest {
// write the mocking script
peer.sendFrame().synReply(false, 42, Arrays.asList("a", "android"));
peer.acceptFrame(); // RST_STREAM
peer.sendFrame().ping(2);
peer.sendFrame().ping(false, 2, 0);
peer.acceptFrame(); // PING
peer.play();
@@ -337,7 +337,7 @@ public final class SpdyConnectionTest {
MockSpdyPeer.InFrame rstStream = peer.takeFrame();
assertEquals(TYPE_RST_STREAM, rstStream.type);
assertEquals(42, rstStream.streamId);
assertEquals(RST_INVALID_STREAM, rstStream.statusCode);
assertEquals(INVALID_STREAM, rstStream.errorCode);
MockSpdyPeer.InFrame ping = peer.takeFrame();
assertEquals(2, ping.streamId);
}
@@ -349,7 +349,7 @@ public final class SpdyConnectionTest {
peer.acceptFrame(); // TYPE_DATA
peer.acceptFrame(); // TYPE_DATA with FLAG_FIN
peer.acceptFrame(); // PING
peer.sendFrame().ping(1);
peer.sendFrame().ping(true, 1, 0);
peer.play();
// play it back
@@ -391,9 +391,9 @@ public final class SpdyConnectionTest {
@Test public void serverClosesClientOutputStream() throws Exception {
// write the mocking script
peer.acceptFrame(); // SYN_STREAM
peer.sendFrame().rstStream(1, SpdyStream.RST_CANCEL);
peer.sendFrame().rstStream(1, CANCEL);
peer.acceptFrame(); // PING
peer.sendFrame().ping(1);
peer.sendFrame().ping(true, 1, 0);
peer.acceptFrame(); // DATA
peer.play();
@@ -467,7 +467,7 @@ public final class SpdyConnectionTest {
assertFalse(synStream.outFinished);
MockSpdyPeer.InFrame rstStream = peer.takeFrame();
assertEquals(TYPE_RST_STREAM, rstStream.type);
assertEquals(SpdyStream.RST_CANCEL, rstStream.statusCode);
assertEquals(CANCEL, rstStream.errorCode);
}
/**
@@ -515,7 +515,7 @@ public final class SpdyConnectionTest {
assertFalse(fin.outFinished);
MockSpdyPeer.InFrame rstStream = peer.takeFrame();
assertEquals(TYPE_RST_STREAM, rstStream.type);
assertEquals(SpdyStream.RST_CANCEL, rstStream.statusCode);
assertEquals(CANCEL, rstStream.errorCode);
}
@Test public void serverClosesClientInputStream() throws Exception {
@@ -547,7 +547,7 @@ public final class SpdyConnectionTest {
peer.sendFrame().synReply(false, 1, Arrays.asList("a", "android"));
peer.acceptFrame(); // PING
peer.sendFrame().synReply(false, 1, Arrays.asList("b", "banana"));
peer.sendFrame().ping(1);
peer.sendFrame().ping(true, 1, 0);
peer.acceptFrame(); // RST_STREAM
peer.play();
@@ -571,7 +571,7 @@ public final class SpdyConnectionTest {
MockSpdyPeer.InFrame rstStream = peer.takeFrame();
assertEquals(TYPE_RST_STREAM, rstStream.type);
assertEquals(1, rstStream.streamId);
assertEquals(RST_STREAM_IN_USE, rstStream.statusCode);
assertEquals(STREAM_IN_USE, rstStream.errorCode);
}
@Test public void remoteDoubleSynStream() throws Exception {
@@ -588,7 +588,7 @@ public final class SpdyConnectionTest {
@Override public void receive(SpdyStream stream) throws IOException {
receiveCount.incrementAndGet();
assertEquals(Arrays.asList("a", "android"), stream.getRequestHeaders());
assertEquals(-1, stream.getRstStatusCode());
assertEquals(null, stream.getErrorCode());
stream.reply(Arrays.asList("c", "cola"), true);
}
};
@@ -600,7 +600,7 @@ public final class SpdyConnectionTest {
MockSpdyPeer.InFrame rstStream = peer.takeFrame();
assertEquals(TYPE_RST_STREAM, rstStream.type);
assertEquals(2, rstStream.streamId);
assertEquals(RST_PROTOCOL_ERROR, rstStream.statusCode);
assertEquals(PROTOCOL_ERROR, rstStream.errorCode);
assertEquals(1, receiveCount.intValue());
}
@@ -610,7 +610,7 @@ public final class SpdyConnectionTest {
peer.sendFrame().synReply(false, 1, Arrays.asList("a", "android"));
peer.sendFrame().data(true, 1, "robot".getBytes("UTF-8"));
peer.sendFrame().data(true, 1, "c3po".getBytes("UTF-8")); // Ignored.
peer.sendFrame().ping(2); // Ping just to make sure the stream was fastforwarded.
peer.sendFrame().ping(false, 2, 0); // Ping just to make sure the stream was fastforwarded.
peer.acceptFrame(); // PING
peer.play();
@@ -634,7 +634,7 @@ public final class SpdyConnectionTest {
peer.sendFrame().synReply(false, 1, Arrays.asList("b", "banana"));
peer.sendFrame().data(false, 1, new byte[64 * 1024 + 1]);
peer.acceptFrame(); // RST_STREAM
peer.sendFrame().ping(2); // Ping just to make sure the stream was fastforwarded.
peer.sendFrame().ping(false, 2, 0); // Ping just to make sure the stream was fastforwarded.
peer.acceptFrame(); // PING
peer.play();
@@ -649,7 +649,7 @@ public final class SpdyConnectionTest {
MockSpdyPeer.InFrame rstStream = peer.takeFrame();
assertEquals(TYPE_RST_STREAM, rstStream.type);
assertEquals(1, rstStream.streamId);
assertEquals(RST_FLOW_CONTROL_ERROR, rstStream.statusCode);
assertEquals(FLOW_CONTROL_ERROR, rstStream.errorCode);
MockSpdyPeer.InFrame ping = peer.takeFrame();
assertEquals(TYPE_PING, ping.type);
assertEquals(2, ping.streamId);
@@ -658,8 +658,8 @@ public final class SpdyConnectionTest {
@Test public void remoteSendsRefusedStreamBeforeReplyHeaders() throws Exception {
// write the mocking script
peer.acceptFrame(); // SYN_STREAM
peer.sendFrame().rstStream(1, RST_REFUSED_STREAM);
peer.sendFrame().ping(2);
peer.sendFrame().rstStream(1, REFUSED_STREAM);
peer.sendFrame().ping(false, 2, 0);
peer.acceptFrame(); // PING
peer.play();
@@ -686,9 +686,9 @@ public final class SpdyConnectionTest {
// write the mocking script
peer.acceptFrame(); // SYN_STREAM 1
peer.acceptFrame(); // SYN_STREAM 3
peer.sendFrame().goAway(1, GOAWAY_PROTOCOL_ERROR);
peer.sendFrame().goAway(1, PROTOCOL_ERROR);
peer.acceptFrame(); // PING
peer.sendFrame().ping(1);
peer.sendFrame().ping(true, 1, 0);
peer.acceptFrame(); // DATA STREAM 1
peer.play();
@@ -733,14 +733,14 @@ public final class SpdyConnectionTest {
peer.acceptFrame(); // GOAWAY
peer.acceptFrame(); // PING
peer.sendFrame().synStream(false, false, 2, 0, 0, 0, Arrays.asList("b", "b")); // Should be ignored!
peer.sendFrame().ping(1);
peer.sendFrame().ping(true, 1, 0);
peer.play();
// play it back
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
connection.newStream(Arrays.asList("a", "android"), true, true);
Ping ping = connection.ping();
connection.shutdown(GOAWAY_PROTOCOL_ERROR);
connection.shutdown(PROTOCOL_ERROR);
assertEquals(1, connection.openStreamCount());
ping.roundTripTime(); // Prevent the peer from exiting prematurely.
@@ -752,7 +752,7 @@ public final class SpdyConnectionTest {
MockSpdyPeer.InFrame goaway = peer.takeFrame();
assertEquals(TYPE_GOAWAY, goaway.type);
assertEquals(0, goaway.streamId);
assertEquals(GOAWAY_PROTOCOL_ERROR, goaway.statusCode);
assertEquals(PROTOCOL_ERROR, goaway.errorCode);
}
@Test public void noPingsAfterShutdown() throws Exception {
@@ -762,7 +762,7 @@ public final class SpdyConnectionTest {
// play it back
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
connection.shutdown(GOAWAY_INTERNAL_ERROR);
connection.shutdown(INTERNAL_ERROR);
try {
connection.ping();
fail();
@@ -773,7 +773,7 @@ public final class SpdyConnectionTest {
// verify the peer received what was expected
MockSpdyPeer.InFrame goaway = peer.takeFrame();
assertEquals(TYPE_GOAWAY, goaway.type);
assertEquals(GOAWAY_INTERNAL_ERROR, goaway.statusCode);
assertEquals(INTERNAL_ERROR, goaway.errorCode);
}
@Test public void close() throws Exception {
@@ -836,7 +836,7 @@ public final class SpdyConnectionTest {
peer.acceptFrame(); // SYN_STREAM
peer.sendFrame().synReply(false, 1, Arrays.asList("a", "android"));
peer.acceptFrame(); // PING
peer.sendFrame().ping(1);
peer.sendFrame().ping(true, 1, 0);
peer.play();
// play it back
@@ -866,7 +866,7 @@ public final class SpdyConnectionTest {
peer.acceptFrame(); // PING
peer.sendFrame().synReply(false, 1, Arrays.asList("a", "android"));
peer.sendFrame().headers(1, Arrays.asList("c", "c3po"));
peer.sendFrame().ping(1);
peer.sendFrame().ping(true, 1, 0);
peer.play();
// play it back
@@ -888,7 +888,7 @@ public final class SpdyConnectionTest {
peer.acceptFrame(); // PING
peer.sendFrame().headers(1, Arrays.asList("c", "c3po"));
peer.acceptFrame(); // RST_STREAM
peer.sendFrame().ping(1);
peer.sendFrame().ping(true, 1, 0);
peer.play();
// play it back
@@ -909,7 +909,7 @@ public final class SpdyConnectionTest {
assertEquals(TYPE_PING, ping.type);
MockSpdyPeer.InFrame rstStream = peer.takeFrame();
assertEquals(TYPE_RST_STREAM, rstStream.type);
assertEquals(RST_PROTOCOL_ERROR, rstStream.statusCode);
assertEquals(PROTOCOL_ERROR, rstStream.errorCode);
}
@Test public void readSendsWindowUpdate() throws Exception {

View File

@@ -16,6 +16,7 @@
package com.squareup.okhttp.internal.http;
import com.squareup.okhttp.internal.spdy.ErrorCode;
import com.squareup.okhttp.internal.spdy.SpdyConnection;
import com.squareup.okhttp.internal.spdy.SpdyStream;
import java.io.IOException;
@@ -84,7 +85,7 @@ public final class SpdyTransport implements Transport {
InputStream responseBodyIn) {
if (streamCanceled) {
if (stream != null) {
stream.closeLater(SpdyStream.RST_CANCEL);
stream.closeLater(ErrorCode.CANCEL);
return true;
} else {
// If stream is null, it either means that writeRequestHeaders wasn't called