From 4a62aef3f8a0bcd42c730d02191e89f2d91a2dad Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Mon, 20 Jan 2014 18:25:08 -0800 Subject: [PATCH] Basic support for reading and writing http/2 push promise frames. --- .../okhttp/internal/spdy/FrameReader.java | 18 ++++ .../okhttp/internal/spdy/FrameWriter.java | 18 ++++ .../okhttp/internal/spdy/Http20Draft09.java | 49 +++++++---- .../squareup/okhttp/internal/spdy/Spdy3.java | 6 ++ .../okhttp/internal/spdy/SpdyConnection.java | 18 ++++ .../okhttp/internal/spdy/BaseTestHandler.java | 5 ++ .../internal/spdy/Http20Draft09Test.java | 83 +++++++++++++++++++ .../okhttp/internal/spdy/MockSpdyPeer.java | 10 ++- .../internal/spdy/SpdyConnectionTest.java | 32 ++++++- .../okhttp/internal/http/HttpEngine.java | 2 +- 10 files changed, 221 insertions(+), 20 deletions(-) diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/FrameReader.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/FrameReader.java index 8db3438ff..3d97841f3 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/FrameReader.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/FrameReader.java @@ -52,5 +52,23 @@ public interface FrameReader extends Closeable { void goAway(int lastGoodStreamId, ErrorCode errorCode); void windowUpdate(int streamId, int deltaWindowSize, boolean endFlowControl); void priority(int streamId, int priority); + + /** + * HTTP/2 only. Receive a push promise header block. + *

+ * A push promise contains all the headers that pertain to a server-initiated + * request, and a {@code promisedStreamId} to which response frames will be + * delivered. Push promise frames are sent as a part of the response to + * {@code streamId}. The {@code promisedStreamId} has a priority of one + * greater than {@code streamId}. + * + * @param streamId client-initiated stream ID. Must be an odd number. + * @param promisedStreamId server-initiated stream ID. Must be an even + * number. + * @param requestHeaders minimally includes {@code :method}, {@code :scheme}, + * {@code :authority}, and (@code :path}. + */ + void pushPromise(int streamId, int promisedStreamId, List

requestHeaders) + throws IOException; } } diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/FrameWriter.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/FrameWriter.java index 79469c417..7c1c329fe 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/FrameWriter.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/FrameWriter.java @@ -26,6 +26,24 @@ public interface FrameWriter extends Closeable { void connectionHeader() throws IOException; void ackSettings() throws IOException; + /** + * HTTP/2 only. Send a push promise header block. + *

+ * A push promise contains all the headers that pertain to a server-initiated + * request, and a {@code promisedStreamId} to which response frames will be + * delivered. Push promise frames are sent as a part of the response to + * {@code streamId}. The {@code promisedStreamId} has a priority of one + * greater than {@code streamId}. + * + * @param streamId client-initiated stream ID. Must be an odd number. + * @param promisedStreamId server-initiated stream ID. Must be an even + * number. + * @param requestHeaders minimally includes {@code :method}, {@code :scheme}, + * {@code :authority}, and (@code :path}. + */ + void pushPromise(int streamId, int promisedStreamId, List

requestHeaders) + throws IOException; + /** SPDY/3 only. */ void flush() throws IOException; void synStream(boolean outFinished, boolean inFinished, int streamId, int associatedStreamId, diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Http20Draft09.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Http20Draft09.java index f386d1701..1797cbd92 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Http20Draft09.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Http20Draft09.java @@ -24,7 +24,6 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.io.UnsupportedEncodingException; import java.util.Arrays; import java.util.List; @@ -40,11 +39,7 @@ public final class Http20Draft09 implements Variant { // http://tools.ietf.org/html/draft-ietf-httpbis-http2-09#section-6.5 @Override public Settings defaultOkHttpSettings(boolean client) { - Settings settings = initialPeerSettings(client); - if (client) { // TODO: we don't yet support reading push. - settings.set(Settings.ENABLE_PUSH, 0, 0); - } - return settings; + return initialPeerSettings(client); } @Override public Settings initialPeerSettings(boolean client) { @@ -57,14 +52,8 @@ public final class Http20Draft09 implements Variant { return settings; } - private static final byte[] CONNECTION_HEADER; - static { - try { - CONNECTION_HEADER = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes("UTF-8"); - } catch (UnsupportedEncodingException e) { - throw new AssertionError(); - } - } + private static final byte[] CONNECTION_HEADER = + "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes(Util.UTF_8); static final int TYPE_DATA = 0x0; static final int TYPE_HEADERS = 0x1; @@ -79,8 +68,9 @@ public final class Http20Draft09 implements Variant { static final int FLAG_END_STREAM = 0x1; - /** Used for headers, push-promise and continuation. */ + /** Used for headers and continuation. */ static final int FLAG_END_HEADERS = 0x4; + static final int FLAG_END_PUSH_PROMISE = 0x4; static final int FLAG_PRIORITY = 0x8; static final int FLAG_ACK = 0x1; static final int FLAG_END_FLOW_CONTROL = 0x1; @@ -252,8 +242,17 @@ public final class Http20Draft09 implements Variant { } } - private void readPushPromise(Handler handler, int flags, int length, int streamId) { - // TODO: + private void readPushPromise(Handler handler, int flags, int length, int streamId) + throws IOException { + if (streamId == 0) { + throw ioException("PROTOCOL_ERROR: TYPE_PUSH_PROMISE streamId == 0"); + } + boolean endHeaders = (flags & FLAG_END_PUSH_PROMISE) != 0; + + int promisedStreamId = in.readInt() & 0x7fffffff; + List
headerBlock = readHeaderBlock(length, endHeaders, streamId); + + handler.pushPromise(streamId, promisedStreamId, headerBlock); } private void readPing(Handler handler, int flags, int length, int streamId) throws IOException { @@ -341,6 +340,22 @@ public final class Http20Draft09 implements Variant { headers(false, streamId, -1, headerBlock); } + @Override + public void pushPromise(int streamId, int promisedStreamId, List
requestHeaders) + throws IOException { + hpackBuffer.reset(); + hpackWriter.writeHeaders(requestHeaders); + int type = TYPE_PUSH_PROMISE; + // TODO: implement CONTINUATION + int length = hpackBuffer.size(); + checkFrameSize(length); + int flags = FLAG_END_HEADERS; + out.writeInt((length & 0x3fff) << 16 | (type & 0xff) << 8 | (flags & 0xff)); + out.writeInt(streamId & 0x7fffffff); + out.writeInt(promisedStreamId & 0x7fffffff); + hpackBuffer.writeTo(out); + } + private void headers(boolean outFinished, int streamId, int priority, List
headerBlock) throws IOException { hpackBuffer.reset(); diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Spdy3.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Spdy3.java index 30541e5c4..6688625f3 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Spdy3.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Spdy3.java @@ -322,6 +322,12 @@ final class Spdy3 implements Variant { // Do nothing: no ACK for SPDY/3 settings. } + @Override + public void pushPromise(int streamId, int promisedStreamId, List
requestHeaders) + throws IOException { + // Do nothing: no push promise for SPDY/3. + } + @Override public synchronized void connectionHeader() { // Do nothing: no connection header for SPDY/3. } diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java index fb860dc9d..504d49fc0 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java @@ -630,5 +630,23 @@ public final class SpdyConnection implements Closeable { @Override public void priority(int streamId, int priority) { // TODO: honor priority. } + + @Override + public void pushPromise(int streamId, int promisedStreamId, List
requestHeaders) + throws IOException { + // TODO: Wire up properly and only cancel when local settings disable push. + cancelStreamLater(promisedStreamId); + } + + private void cancelStreamLater(final int streamId) { + executor.submit(new NamedRunnable("OkHttp %s Cancelling Stream %s", hostName, streamId) { + @Override public void execute() { + try { + frameWriter.rstStream(streamId, ErrorCode.CANCEL); + } catch (IOException ignored) { + } + } + }); + } } } diff --git a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/BaseTestHandler.java b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/BaseTestHandler.java index c3fe9677a..7daa60ff2 100644 --- a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/BaseTestHandler.java +++ b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/BaseTestHandler.java @@ -61,4 +61,9 @@ class BaseTestHandler implements FrameReader.Handler { @Override public void priority(int streamId, int priority) { fail(); } + + @Override + public void pushPromise(int streamId, int associatedStreamId, List
headerBlock) { + fail(); + } } diff --git a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/Http20Draft09Test.java b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/Http20Draft09Test.java index e66a48f24..2fdf3c6be 100644 --- a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/Http20Draft09Test.java +++ b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/Http20Draft09Test.java @@ -19,6 +19,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.util.Arrays; import java.util.List; import org.junit.Test; @@ -146,6 +147,88 @@ public class Http20Draft09Test { }); } + @Test public void pushPromise() throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputStream dataOut = new DataOutputStream(out); + + final int expectedPromisedStreamId = 11; + + final List
pushPromise = Arrays.asList( + new Header(Header.TARGET_METHOD, "GET"), + new Header(Header.TARGET_SCHEME, "https"), + new Header(Header.TARGET_AUTHORITY, "squareup.com"), + new Header(Header.TARGET_PATH, "/") + ); + + { // Write the push promise frame, specifying the associated stream ID. + byte[] headerBytes = literalHeaders(pushPromise); + dataOut.writeShort(headerBytes.length); + dataOut.write(Http20Draft09.TYPE_PUSH_PROMISE); + dataOut.write(Http20Draft09.FLAG_END_PUSH_PROMISE); + dataOut.writeInt(expectedStreamId & 0x7fffffff); + dataOut.writeInt(expectedPromisedStreamId & 0x7fffffff); + dataOut.write(headerBytes); + } + + FrameReader fr = newReader(out); + + // Consume the headers frame. + fr.nextFrame(new BaseTestHandler() { + @Override + public void pushPromise(int streamId, int promisedStreamId, List
headerBlock) { + assertEquals(expectedStreamId, streamId); + assertEquals(expectedPromisedStreamId, promisedStreamId); + assertEquals(pushPromise, headerBlock); + } + }); + } + + /** Headers are compressed, then framed. */ + @Test public void pushPromiseThenContinuation() throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputStream dataOut = new DataOutputStream(out); + + final int expectedPromisedStreamId = 11; + + final List
pushPromise = Arrays.asList( + new Header(Header.TARGET_METHOD, "GET"), + new Header(Header.TARGET_SCHEME, "https"), + new Header(Header.TARGET_AUTHORITY, "squareup.com"), + new Header(Header.TARGET_PATH, "/") + ); + + // Decoding the first header will cross frame boundaries. + byte[] headerBlock = literalHeaders(pushPromise); + { // Write the first headers frame. + dataOut.writeShort(headerBlock.length / 2); + dataOut.write(Http20Draft09.TYPE_PUSH_PROMISE); + dataOut.write(0); // no flags + dataOut.writeInt(expectedStreamId & 0x7fffffff); + dataOut.writeInt(expectedPromisedStreamId & 0x7fffffff); + dataOut.write(headerBlock, 0, headerBlock.length / 2); + } + + { // Write the continuation frame, specifying no more frames are expected. + dataOut.writeShort(headerBlock.length / 2); + dataOut.write(Http20Draft09.TYPE_CONTINUATION); + dataOut.write(Http20Draft09.FLAG_END_HEADERS); + dataOut.writeInt(expectedStreamId & 0x7fffffff); + dataOut.write(headerBlock, headerBlock.length / 2, headerBlock.length / 2); + } + + FrameReader fr = newReader(out); + + // Reading the above frames should result in a concatenated headerBlock. + fr.nextFrame(new BaseTestHandler() { + @Override + public void pushPromise(int streamId, int promisedStreamId, List
headerBlock) { + assertEquals(expectedStreamId, streamId); + assertEquals(expectedPromisedStreamId, promisedStreamId); + assertEquals(pushPromise, headerBlock); + } + }); + } + @Test public void readRstStreamFrame() throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); DataOutputStream dataOut = new DataOutputStream(out); diff --git a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/MockSpdyPeer.java b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/MockSpdyPeer.java index fa4920e11..f83c18a98 100644 --- a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/MockSpdyPeer.java +++ b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/MockSpdyPeer.java @@ -262,5 +262,13 @@ public final class MockSpdyPeer implements Closeable { @Override public void priority(int streamId, int priority) { throw new UnsupportedOperationException(); } + + @Override + public void pushPromise(int streamId, int associatedStreamId, List
headerBlock) { + this.type = Http20Draft09.TYPE_PUSH_PROMISE; + this.streamId = streamId; + this.associatedStreamId = associatedStreamId; + this.headerBlock = headerBlock; + } } -} \ No newline at end of file +} diff --git a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/SpdyConnectionTest.java b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/SpdyConnectionTest.java index 190d33ee7..e8f1c0c5b 100644 --- a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/SpdyConnectionTest.java +++ b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/SpdyConnectionTest.java @@ -13,7 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package com.squareup.okhttp.internal.spdy; import com.squareup.okhttp.Protocol; @@ -25,6 +24,7 @@ import java.io.InputStream; import java.io.InterruptedIOException; import java.io.OutputStream; import java.util.Arrays; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.junit.After; @@ -1075,6 +1075,36 @@ public final class SpdyConnectionTest { assertStreamData("robot", stream.getInputStream()); } + // TODO: change this to only cancel when local settings disable push + @Test public void pushPromiseStreamsAutomaticallyCancel() throws Exception { + MockSpdyPeer peer = new MockSpdyPeer(Variant.HTTP_20_DRAFT_09, false); + + // write the mocking script + peer.sendFrame().pushPromise(1, 2, Arrays.asList( + new Header(Header.TARGET_METHOD, "GET"), + new Header(Header.TARGET_SCHEME, "https"), + new Header(Header.TARGET_AUTHORITY, "squareup.com"), + new Header(Header.TARGET_PATH, "/cached") + )); + peer.sendFrame().synReply(true, 1, Arrays.asList( + new Header(Header.RESPONSE_STATUS, "200") + )); + peer.acceptFrame(); // RST_STREAM + peer.play(); + + // play it back + SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()) + .protocol(Protocol.HTTP_2) + .handler(REJECT_INCOMING_STREAMS) + .build(); + + // verify the peer received what was expected + MockSpdyPeer.InFrame rstStream = peer.takeFrame(); + assertEquals(TYPE_RST_STREAM, rstStream.type); + assertEquals(2, rstStream.streamId); + assertEquals(CANCEL, rstStream.errorCode); + } + private SpdyConnection sendHttp2SettingsAndCheckForAck(boolean client, Settings settings) throws IOException, InterruptedException { MockSpdyPeer peer = new MockSpdyPeer(Variant.HTTP_20_DRAFT_09, client); diff --git a/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpEngine.java b/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpEngine.java index 4a698781e..3e7dcbdba 100644 --- a/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpEngine.java +++ b/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpEngine.java @@ -333,7 +333,7 @@ public class HttpEngine { private boolean isRecoverable(IOException e) { // If the problem was a CertificateException from the X509TrustManager, - // do not retry, we didn't have an abrupt server initiated exception. + // do not retry, we didn't have an abrupt server-initiated exception. boolean sslFailure = e instanceof SSLHandshakeException && e.getCause() instanceof CertificateException; boolean protocolFailure = e instanceof ProtocolException;