From a5ba3e9062f3bde4bb6896f1db433a2408119e94 Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Sat, 1 Mar 2014 10:27:00 -0800 Subject: [PATCH] Add HTTP/2 PushObserver. --- .../okhttp/mockwebserver/MockWebServer.java | 2 +- .../internal/http/ExternalHttp2Example.java | 3 +- .../internal/spdy/SpdyConnectionTest.java | 195 +++++++++++++----- .../okhttp/internal/spdy/PushObserver.java | 85 ++++++++ .../okhttp/internal/spdy/SpdyConnection.java | 143 ++++++++++--- 5 files changed, 354 insertions(+), 74 deletions(-) create mode 100644 okhttp/src/main/java/com/squareup/okhttp/internal/spdy/PushObserver.java diff --git a/mockwebserver/src/main/java/com/squareup/okhttp/mockwebserver/MockWebServer.java b/mockwebserver/src/main/java/com/squareup/okhttp/mockwebserver/MockWebServer.java index f90b2125c..802129019 100644 --- a/mockwebserver/src/main/java/com/squareup/okhttp/mockwebserver/MockWebServer.java +++ b/mockwebserver/src/main/java/com/squareup/okhttp/mockwebserver/MockWebServer.java @@ -729,7 +729,7 @@ public final class MockWebServer { if (headerParts.length != 2) { throw new AssertionError("Unexpected header: " + header); } - pushedHeaders.add(new Header(headerParts[0], headerParts[1])); + pushedHeaders.add(new Header(headerParts[0], headerParts[1].trim())); } String requestLine = pushPromise.getMethod() + ' ' + pushPromise.getPath() + " HTTP/1.1"; List chunkSizes = Collections.emptyList(); // No chunked encoding for SPDY. diff --git a/okhttp-tests/src/test/java/com/squareup/okhttp/internal/http/ExternalHttp2Example.java b/okhttp-tests/src/test/java/com/squareup/okhttp/internal/http/ExternalHttp2Example.java index c8d9f22df..1c5198c69 100644 --- a/okhttp-tests/src/test/java/com/squareup/okhttp/internal/http/ExternalHttp2Example.java +++ b/okhttp-tests/src/test/java/com/squareup/okhttp/internal/http/ExternalHttp2Example.java @@ -30,7 +30,7 @@ import static com.squareup.okhttp.internal.http.OkHeaders.SELECTED_PROTOCOL; public final class ExternalHttp2Example { public static void main(String[] args) throws Exception { - URL url = new URL("https://twitter.com/"); + URL url = new URL("https://http2.iijplus.jp/push/test1"); HttpsURLConnection connection = (HttpsURLConnection) new OkHttpClient() .setProtocols(Protocol.HTTP2_AND_HTTP_11).open(url); @@ -48,6 +48,7 @@ public final class ExternalHttp2Example { if (protocolValues != null && !protocolValues.isEmpty()) { System.out.println("PROTOCOL " + protocolValues.get(0)); } + BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8")); String line; diff --git a/okhttp-tests/src/test/java/com/squareup/okhttp/internal/spdy/SpdyConnectionTest.java b/okhttp-tests/src/test/java/com/squareup/okhttp/internal/spdy/SpdyConnectionTest.java index 413c9546c..dbaea5200 100644 --- a/okhttp-tests/src/test/java/com/squareup/okhttp/internal/spdy/SpdyConnectionTest.java +++ b/okhttp-tests/src/test/java/com/squareup/okhttp/internal/spdy/SpdyConnectionTest.java @@ -18,10 +18,13 @@ package com.squareup.okhttp.internal.spdy; import com.squareup.okhttp.internal.Util; import java.io.IOException; import java.io.InterruptedIOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import okio.BufferedSink; +import okio.BufferedSource; import okio.ByteString; import okio.OkBuffer; import okio.Okio; @@ -53,12 +56,6 @@ import static org.junit.Assert.fail; public final class SpdyConnectionTest { private static final Variant SPDY3 = new Spdy3(); private static final Variant HTTP_20_DRAFT_09 = new Http20Draft09(); - - private static final IncomingStreamHandler REJECT_INCOMING_STREAMS = new IncomingStreamHandler() { - @Override public void receive(SpdyStream stream) throws IOException { - throw new AssertionError(); - } - }; private final MockSpdyPeer peer = new MockSpdyPeer(); @After public void tearDown() throws Exception { @@ -75,7 +72,7 @@ public final class SpdyConnectionTest { peer.play(); // play it back - SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + SpdyConnection connection = connection(peer, SPDY3); SpdyStream stream = connection.newStream(headerEntries("b", "banana"), true, true); assertEquals(headerEntries("a", "android"), stream.getResponseHeaders()); assertStreamData("robot", stream.getSource()); @@ -102,7 +99,7 @@ public final class SpdyConnectionTest { peer.sendFrame().synReply(false, 1, headerEntries("b", "banana")); peer.play(); - SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + SpdyConnection connection = connection(peer, SPDY3); SpdyStream stream = connection.newStream(headerEntries("a", "android"), false, false); assertEquals(1, connection.openStreamCount()); assertEquals(headerEntries("b", "banana"), stream.getResponseHeaders()); @@ -118,7 +115,7 @@ public final class SpdyConnectionTest { peer.play(); // play it back - SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + SpdyConnection connection = connection(peer, SPDY3); connection.newStream(headerEntries("b", "banana"), false, true); assertEquals(1, connection.openStreamCount()); connection.ping().roundTripTime(); // Ensure that the SYN_REPLY has been received. @@ -133,8 +130,16 @@ public final class SpdyConnectionTest { } @Test public void serverCreatesStreamAndClientReplies() throws Exception { + final List
pushHeaders = headerEntries( + ":scheme", "https", + ":host", "localhost:8888", + ":method", "GET", + ":path", "/index.html", + ":status", "200", + ":version", "HTTP/1.1", + "content-type", "text/html"); // write the mocking script - peer.sendFrame().synStream(false, false, 2, 0, 5, 129, headerEntries("a", "android")); + peer.sendFrame().synStream(false, false, 2, 0, 5, 129, pushHeaders); peer.acceptFrame(); // SYN_REPLY peer.play(); @@ -143,7 +148,7 @@ public final class SpdyConnectionTest { IncomingStreamHandler handler = new IncomingStreamHandler() { @Override public void receive(SpdyStream stream) throws IOException { receiveCount.incrementAndGet(); - assertEquals(headerEntries("a", "android"), stream.getRequestHeaders()); + assertEquals(pushHeaders, stream.getRequestHeaders()); assertEquals(null, stream.getErrorCode()); assertEquals(5, stream.getPriority()); stream.reply(headerEntries("b", "banana"), true); @@ -162,17 +167,6 @@ public final class SpdyConnectionTest { } @Test public void replyWithNoData() throws Exception { - MockSpdyPeer.InFrame reply = replyWithNoData(SPDY3); - assertEquals(HeadersMode.SPDY_REPLY, reply.headersMode); - } - - @Test public void replyWithNoDataHttp2() throws Exception { - MockSpdyPeer.InFrame reply = replyWithNoData(HTTP_20_DRAFT_09); - assertEquals(HeadersMode.HTTP_20_HEADERS, reply.headersMode); - } - - private MockSpdyPeer.InFrame replyWithNoData(Variant variant) throws Exception { - peer.setVariantAndClient(variant, false); // write the mocking script peer.sendFrame().synStream(false, false, 2, 0, 0, 0, headerEntries("a", "android")); peer.acceptFrame(); // SYN_REPLY @@ -186,7 +180,8 @@ public final class SpdyConnectionTest { receiveCount.incrementAndGet(); } }; - connectionBuilder(peer, variant).handler(handler).build(); + + connectionBuilder(peer, SPDY3).handler(handler).build(); // verify the peer received what was expected MockSpdyPeer.InFrame reply = peer.takeFrame(); @@ -194,7 +189,7 @@ public final class SpdyConnectionTest { assertTrue(reply.inFinished); assertEquals(headerEntries("b", "banana"), reply.headerBlock); assertEquals(1, receiveCount.get()); - return reply; + assertEquals(HeadersMode.SPDY_REPLY, reply.headersMode); } @Test public void serverPingsClient() throws Exception { @@ -304,7 +299,7 @@ public final class SpdyConnectionTest { assertTrue(ackFrame.ack); // This stream was created *after* the connection settings were adjusted. - SpdyStream stream = connection.newStream(headerEntries("a", "android"), true, true); + SpdyStream stream = connection.newStream(headerEntries("a", "android"), false, true); assertEquals(3368, connection.peerSettings.getInitialWindowSize()); assertEquals(1684, connection.bytesLeftInWriteWindow); // initial wasn't affected. @@ -412,7 +407,7 @@ public final class SpdyConnectionTest { @Test public void bogusDataFrameDoesNotDisruptConnection() throws Exception { // write the mocking script - peer.sendFrame().data(true, 42, new OkBuffer().writeUtf8("bogus")); + peer.sendFrame().data(true, 41, new OkBuffer().writeUtf8("bogus")); peer.acceptFrame(); // RST_STREAM peer.sendFrame().ping(false, 2, 0); peer.acceptFrame(); // PING @@ -424,7 +419,7 @@ public final class SpdyConnectionTest { // verify the peer received what was expected MockSpdyPeer.InFrame rstStream = peer.takeFrame(); assertEquals(TYPE_RST_STREAM, rstStream.type); - assertEquals(42, rstStream.streamId); + assertEquals(41, rstStream.streamId); assertEquals(INVALID_STREAM, rstStream.errorCode); MockSpdyPeer.InFrame ping = peer.takeFrame(); assertEquals(2, ping.payload1); @@ -432,7 +427,7 @@ public final class SpdyConnectionTest { @Test public void bogusReplyFrameDoesNotDisruptConnection() throws Exception { // write the mocking script - peer.sendFrame().synReply(false, 42, headerEntries("a", "android")); + peer.sendFrame().synReply(false, 41, headerEntries("a", "android")); peer.acceptFrame(); // RST_STREAM peer.sendFrame().ping(false, 2, 0); peer.acceptFrame(); // PING @@ -444,7 +439,7 @@ public final class SpdyConnectionTest { // verify the peer received what was expected MockSpdyPeer.InFrame rstStream = peer.takeFrame(); assertEquals(TYPE_RST_STREAM, rstStream.type); - assertEquals(42, rstStream.streamId); + assertEquals(41, rstStream.streamId); assertEquals(INVALID_STREAM, rstStream.errorCode); MockSpdyPeer.InFrame ping = peer.takeFrame(); assertEquals(2, ping.payload1); @@ -656,7 +651,7 @@ public final class SpdyConnectionTest { peer.play(); // play it back - SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + SpdyConnection connection = connection(peer, SPDY3); SpdyStream stream = connection.newStream(headerEntries("c", "cola"), true, true); assertEquals(headerEntries("a", "android"), stream.getResponseHeaders()); connection.ping().roundTripTime(); // Ensure that the 2nd SYN REPLY has been received. @@ -721,7 +716,7 @@ public final class SpdyConnectionTest { peer.play(); // play it back - SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + SpdyConnection connection = connection(peer, SPDY3); SpdyStream stream = connection.newStream(headerEntries("b", "banana"), true, true); assertEquals(headerEntries("a", "android"), stream.getResponseHeaders()); assertStreamData("robot", stream.getSource()); @@ -745,7 +740,7 @@ public final class SpdyConnectionTest { peer.play(); // play it back - SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + SpdyConnection connection = connection(peer, SPDY3); SpdyStream stream = connection.newStream(headerEntries("a", "android"), true, true); assertEquals(headerEntries("b", "banana"), stream.getResponseHeaders()); @@ -767,7 +762,7 @@ public final class SpdyConnectionTest { peer.play(); // play it back - SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + SpdyConnection connection = connection(peer, SPDY3); SpdyStream stream = connection.newStream(headerEntries("a", "android"), true, true); try { stream.getResponseHeaders(); @@ -855,7 +850,7 @@ public final class SpdyConnectionTest { peer.play(); // play it back - SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + SpdyConnection connection = connection(peer, SPDY3); connection.newStream(headerEntries("a", "android"), true, true); Ping ping = connection.ping(); connection.shutdown(PROTOCOL_ERROR); @@ -879,7 +874,7 @@ public final class SpdyConnectionTest { peer.play(); // play it back - SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + SpdyConnection connection = connection(peer, SPDY3); connection.shutdown(INTERNAL_ERROR); try { connection.ping(); @@ -902,7 +897,7 @@ public final class SpdyConnectionTest { peer.play(); // play it back - SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + SpdyConnection connection = connection(peer, SPDY3); SpdyStream stream = connection.newStream(headerEntries("a", "android"), true, true); assertEquals(1, connection.openStreamCount()); connection.close(); @@ -946,7 +941,7 @@ public final class SpdyConnectionTest { peer.play(); // play it back - SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + SpdyConnection connection = connection(peer, SPDY3); Ping ping = connection.ping(); connection.close(); assertEquals(-1, ping.roundTripTime()); @@ -961,7 +956,7 @@ public final class SpdyConnectionTest { peer.play(); // play it back - SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + SpdyConnection connection = connection(peer, SPDY3); SpdyStream stream = connection.newStream(headerEntries("b", "banana"), true, true); stream.setReadTimeout(1000); Source source = stream.getSource(); @@ -991,7 +986,7 @@ public final class SpdyConnectionTest { peer.play(); // play it back - SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + SpdyConnection connection = connection(peer, SPDY3); SpdyStream stream = connection.newStream(headerEntries("b", "banana"), true, true); connection.ping().roundTripTime(); // Ensure that the HEADERS has been received. assertEquals(headerEntries("a", "android", "c", "c3po"), stream.getResponseHeaders()); @@ -1014,7 +1009,7 @@ public final class SpdyConnectionTest { peer.play(); // play it back - SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + SpdyConnection connection = connection(peer, SPDY3); SpdyStream stream = connection.newStream(headerEntries("b", "banana"), true, true); connection.ping().roundTripTime(); // Ensure that the HEADERS has been received. try { @@ -1168,7 +1163,7 @@ public final class SpdyConnectionTest { peer.play(); // Play it back. - SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + SpdyConnection connection = connection(peer, SPDY3); SpdyStream stream = connection.newStream(headerEntries("b", "banana"), true, true); BufferedSink out = Okio.buffer(stream.getSink()); out.write(new byte[INITIAL_WINDOW_SIZE]); @@ -1210,7 +1205,7 @@ public final class SpdyConnectionTest { peer.play(); // Play it back. - SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + SpdyConnection connection = connection(peer, SPDY3); SpdyStream stream = connection.newStream(headerEntries("a", "apple"), true, true); BufferedSink out = Okio.buffer(stream.getSink()); out.write(new byte[INITIAL_WINDOW_SIZE]); @@ -1255,7 +1250,7 @@ public final class SpdyConnectionTest { peer.play(); // play it back - SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + SpdyConnection connection = connection(peer, SPDY3); SpdyStream stream = connection.newStream(headerEntries("b", "banana"), true, true); assertEquals(headerEntries("a", "android"), stream.getResponseHeaders()); Source in = stream.getSource(); @@ -1280,7 +1275,7 @@ public final class SpdyConnectionTest { peer.play(); // Play it back. - SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + SpdyConnection connection = connection(peer, SPDY3); SpdyStream stream1 = connection.newStream(headerEntries("a", "apple"), true, true); BufferedSink out1 = Okio.buffer(stream1.getSink()); out1.write(new byte[INITIAL_WINDOW_SIZE]); @@ -1397,14 +1392,93 @@ public final class SpdyConnectionTest { peer.play(); // play it back - SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + SpdyConnection connection = connection(peer, SPDY3); SpdyStream stream = connection.newStream(headerEntries("b", "banana"), true, true); assertEquals("a", stream.getResponseHeaders().get(0).name.utf8()); assertEquals(length, stream.getResponseHeaders().get(0).value.size()); assertStreamData("robot", stream.getSource()); } - // TODO: change this to only cancel when local settings disable push + @Test public void pushPromiseStream() throws Exception { + peer.setVariantAndClient(HTTP_20_DRAFT_09, false); + + // write the mocking script + peer.acceptFrame(); // SYN_STREAM + peer.sendFrame().synReply(false, 1, headerEntries("a", "android")); + final List
expectedRequestHeaders = Arrays.asList( + new Header(Header.TARGET_METHOD, "GET"), + new Header(Header.TARGET_SCHEME, "https"), + new Header(Header.TARGET_AUTHORITY, "squareup.com"), + new Header(Header.TARGET_PATH, "/cached") + ); + peer.sendFrame().pushPromise(1, 2, expectedRequestHeaders); + final List
expectedResponseHeaders = Arrays.asList( + new Header(Header.RESPONSE_STATUS, "200") + ); + peer.sendFrame().synReply(true, 2, expectedResponseHeaders); + peer.sendFrame().data(true, 1, data(0)); + peer.play(); + + final List events = new ArrayList(); + PushObserver observer = new PushObserver() { + + @Override public boolean onRequest(int streamId, List
requestHeaders) { + assertEquals(2, streamId); + events.add(requestHeaders); + return false; + } + + @Override public boolean onHeaders(int streamId, List
responseHeaders, boolean last) { + assertEquals(2, streamId); + assertTrue(last); + events.add(responseHeaders); + return false; + } + + @Override public boolean onData(int streamId, BufferedSource source, int byteCount, + boolean last) throws IOException { + events.add(new AssertionError("onData")); + return false; + } + + @Override public void onReset(int streamId, ErrorCode errorCode) { + events.add(new AssertionError("onReset")); + } + }; + + // play it back + SpdyConnection connection = connectionBuilder(peer, HTTP_20_DRAFT_09) + .pushObserver(observer).build(); + SpdyStream client = connection.newStream(headerEntries("b", "banana"), false, true); + assertEquals(-1, client.getSource().read(new OkBuffer(), 1)); + + // verify the peer received what was expected + assertEquals(TYPE_HEADERS, peer.takeFrame().type); + + assertEquals(2, events.size()); + assertEquals(expectedRequestHeaders, events.get(0)); + assertEquals(expectedResponseHeaders, events.get(1)); + } + + @Test public void doublePushPromise() throws Exception { + peer.setVariantAndClient(HTTP_20_DRAFT_09, false); + + // write the mocking script + peer.sendFrame().pushPromise(1,2, headerEntries("a", "android")); + peer.acceptFrame(); // SYN_REPLY + peer.sendFrame().pushPromise(1, 2, headerEntries("b", "banana")); + peer.acceptFrame(); // RST_STREAM + peer.play(); + + // play it back + SpdyConnection connection = connectionBuilder(peer, HTTP_20_DRAFT_09).build(); + connection.newStream(headerEntries("b", "banana"), false, true); + + // verify the peer received what was expected + assertEquals(TYPE_HEADERS, peer.takeFrame().type); + assertEquals(PROTOCOL_ERROR, peer.takeFrame().errorCode); + } + @Test public void pushPromiseStreamsAutomaticallyCancel() throws Exception { peer.setVariantAndClient(HTTP_20_DRAFT_09, false); @@ -1422,7 +1496,8 @@ public final class SpdyConnectionTest { peer.play(); // play it back - connection(peer, HTTP_20_DRAFT_09); + connectionBuilder(peer, HTTP_20_DRAFT_09) + .pushObserver(PushObserver.CANCEL).build(); // verify the peer received what was expected MockSpdyPeer.InFrame rstStream = peer.takeFrame(); @@ -1450,12 +1525,14 @@ public final class SpdyConnectionTest { } private SpdyConnection connection(MockSpdyPeer peer, Variant variant) throws IOException { - return connectionBuilder(peer, variant).handler(REJECT_INCOMING_STREAMS).build(); + return connectionBuilder(peer, variant).build(); } private SpdyConnection.Builder connectionBuilder(MockSpdyPeer peer, Variant variant) throws IOException { - return new SpdyConnection.Builder(true, peer.openSocket()).protocol(variant.getProtocol()); + return new SpdyConnection.Builder(true, peer.openSocket()) + .pushObserver(IGNORE) + .protocol(variant.getProtocol()); } private void assertStreamData(String expected, Source source) throws IOException { @@ -1493,4 +1570,24 @@ public final class SpdyConnectionTest { static int roundUp(int num, int divisor) { return (num + divisor - 1) / divisor; } + + static final PushObserver IGNORE = new PushObserver() { + + @Override public boolean onRequest(int streamId, List
requestHeaders) { + return false; + } + + @Override public boolean onHeaders(int streamId, List
responseHeaders, boolean last) { + return false; + } + + @Override public boolean onData(int streamId, BufferedSource source, int byteCount, + boolean last) throws IOException { + source.skip(byteCount); + return false; + } + + @Override public void onReset(int streamId, ErrorCode errorCode) { + } + }; } diff --git a/okhttp/src/main/java/com/squareup/okhttp/internal/spdy/PushObserver.java b/okhttp/src/main/java/com/squareup/okhttp/internal/spdy/PushObserver.java new file mode 100644 index 000000000..bf5ca76e0 --- /dev/null +++ b/okhttp/src/main/java/com/squareup/okhttp/internal/spdy/PushObserver.java @@ -0,0 +1,85 @@ +/* + * Copyright (C) 2014 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.squareup.okhttp.internal.spdy; + +import java.io.IOException; +import java.util.List; +import okio.BufferedSource; + +/** + * {@link com.squareup.okhttp.Protocol#HTTP_2 HTTP/2} only. + * Processes server-initiated HTTP requestd on the client. + * + *

Use the stream ID to correlate response headers and data. + * + *

Return true to request cancellation of a pushed stream. Note that this + * does not guarantee future frames won't arrive on the stream ID. + */ +public interface PushObserver { + /** + * Describes the request that the server intends to push a response for. + * + * @param streamId server-initiated stream ID: an even number. + * @param requestHeaders minimally includes {@code :method}, {@code :scheme}, + * {@code :authority}, and (@code :path}. + */ + boolean onRequest(int streamId, List

requestHeaders); + + /** + * The response headers corresponding to a pushed request. When {@code last} + * is true, there are no data frames to follow. + * + * @param streamId server-initiated stream ID: an even number. + * @param responseHeaders minimally includes {@code :status}. + * @param last when true, there is no response data. + */ + boolean onHeaders(int streamId, List
responseHeaders, boolean last); + + /** + * A chunk of response data corresponding to a pushed request. This data + * must either be read or skipped. + * + * @param streamId server-initiated stream ID: an even number. + * @param source location of data corresponding with this stream ID. + * @param byteCount number of bytes to read or skip from the source. + * @param last when true, there are no data frames to follow. + */ + boolean onData(int streamId, BufferedSource source, int byteCount, boolean last) + throws IOException; + + /** Indicates the reason why this stream was cancelled. */ + void onReset(int streamId, ErrorCode errorCode); + + PushObserver CANCEL = new PushObserver() { + + @Override public boolean onRequest(int streamId, List
requestHeaders) { + return true; + } + + @Override public boolean onHeaders(int streamId, List
responseHeaders, boolean last) { + return true; + } + + @Override public boolean onData(int streamId, BufferedSource source, int byteCount, + boolean last) throws IOException { + source.skip(byteCount); + return true; + } + + @Override public void onReset(int streamId, ErrorCode errorCode) { + } + }; +} diff --git a/okhttp/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java b/okhttp/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java index 7a41b906b..a5194b1f2 100644 --- a/okhttp/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java +++ b/okhttp/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java @@ -24,8 +24,10 @@ import java.io.InterruptedIOException; import java.net.Socket; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -83,6 +85,8 @@ public final class SpdyConnection implements Closeable { /** Lazily-created map of in-flight pings awaiting a response. Guarded by this. */ private Map pings; + /** User code to run in response to push promise events. */ + private final PushObserver pushObserver; private int nextPingId; static final int INITIAL_WINDOW_SIZE = 65535; @@ -123,6 +127,7 @@ public final class SpdyConnection implements Closeable { private SpdyConnection(Builder builder) { protocol = builder.protocol; + pushObserver = builder.pushObserver; client = builder.client; handler = builder.handler; nextStreamId = builder.client ? 1 : 2; @@ -157,7 +162,7 @@ public final class SpdyConnection implements Closeable { /** The protocol as selected using NPN or ALPN. */ public Protocol getProtocol() { - return protocol; + return protocol; } /** @@ -208,6 +213,7 @@ public final class SpdyConnection implements Closeable { public SpdyStream pushStream(int associatedStreamId, List
requestHeaders, boolean out) throws IOException { if (client) throw new IllegalStateException("Client cannot push requests."); + if (protocol != Protocol.HTTP_2) throw new IllegalStateException("protocol != HTTP_2"); return newStream(associatedStreamId, requestHeaders, out, false); } @@ -219,14 +225,13 @@ public final class SpdyConnection implements Closeable { * @param in true to create an input stream that the remote peer can use to send data to us. * Corresponds to {@code FLAG_UNIDIRECTIONAL}. */ - public SpdyStream newStream(List
requestHeaders, boolean out, - boolean in) throws IOException { + public SpdyStream newStream(List
requestHeaders, boolean out, boolean in) + throws IOException { return newStream(0, requestHeaders, out, in); } private SpdyStream newStream(int associatedStreamId, List
requestHeaders, boolean out, - boolean in) - throws IOException { + boolean in) throws IOException { boolean outFinished = !out; boolean inFinished = !in; int priority = -1; // TODO: permit the caller to specify a priority? @@ -250,7 +255,8 @@ public final class SpdyConnection implements Closeable { if (associatedStreamId == 0) { frameWriter.synStream(outFinished, inFinished, streamId, associatedStreamId, priority, slot, requestHeaders); - } else { + } else { // HTTP/2 has a PUSH_PROMISE frame. + if (client) throw new IOException("Client attempted to push stream: " + associatedStreamId); frameWriter.pushPromise(associatedStreamId, streamId, requestHeaders); } } @@ -491,6 +497,7 @@ public final class SpdyConnection implements Closeable { private BufferedSink sink; private IncomingStreamHandler handler = IncomingStreamHandler.REFUSE_INCOMING_STREAMS; private Protocol protocol = Protocol.SPDY_3; + private PushObserver pushObserver = PushObserver.CANCEL; private boolean client; public Builder(boolean client, Socket socket) throws IOException { @@ -519,6 +526,11 @@ public final class SpdyConnection implements Closeable { return this; } + public Builder pushObserver(PushObserver pushObserver) { + this.pushObserver = pushObserver; + return this; + } + public SpdyConnection build() { return new SpdyConnection(this); } @@ -557,6 +569,10 @@ public final class SpdyConnection implements Closeable { @Override public void data(boolean inFinished, int streamId, BufferedSource source, int length) throws IOException { + if (pushedStream(streamId)) { + pushDataLater(streamId, source, length, inFinished); + return; + } SpdyStream dataStream = getStream(streamId); if (dataStream == null) { writeSynResetLater(streamId, ErrorCode.INVALID_STREAM); @@ -570,8 +586,11 @@ public final class SpdyConnection implements Closeable { } @Override public void headers(boolean outFinished, boolean inFinished, int streamId, - int associatedStreamId, int priority, List
headerBlock, - HeadersMode headersMode) { + int associatedStreamId, int priority, List
headerBlock, HeadersMode headersMode) { + if (pushedStream(streamId)) { + pushHeadersLater(streamId, headerBlock, inFinished); + return; + } SpdyStream stream; synchronized (SpdyConnection.this) { // If we're shutdown, don't bother with this stream. @@ -623,6 +642,10 @@ public final class SpdyConnection implements Closeable { } @Override public void rstStream(int streamId, ErrorCode errorCode) { + if (pushedStream(streamId)) { + pushResetLater(streamId, errorCode); + return; + } SpdyStream rstStream = removeStream(streamId); if (rstStream != null) { rstStream.receiveRstStream(errorCode); @@ -730,21 +753,95 @@ public final class SpdyConnection implements Closeable { } @Override - public void pushPromise(int streamId, int promisedStreamId, List
requestHeaders) - throws IOException { - // TODO: Wire up properly and only cancel when local settings disable push. - cancelStreamLater(promisedStreamId); - } - - private void cancelStreamLater(final int streamId) { - executor.submit(new NamedRunnable("OkHttp %s Cancelling Stream %s", hostName, streamId) { - @Override public void execute() { - try { - frameWriter.rstStream(streamId, ErrorCode.CANCEL); - } catch (IOException ignored) { - } - } - }); + public void pushPromise(int streamId, int promisedStreamId, List
requestHeaders) { + pushRequestLater(promisedStreamId, requestHeaders); } } + + /** Even, positive numbered streams are pushed streams in HTTP/2. */ + private boolean pushedStream(int streamId) { + return protocol == Protocol.HTTP_2 && streamId != 0 && (streamId & 1) == 0; + } + + // Guarded by this. + private final Set currentPushRequests = new LinkedHashSet(); + + private void pushRequestLater(final int streamId, final List
requestHeaders) { + synchronized (this) { + if (currentPushRequests.contains(streamId)) { + writeSynResetLater(streamId, ErrorCode.PROTOCOL_ERROR); + return; + } + currentPushRequests.add(streamId); + } + executor.submit(new NamedRunnable("OkHttp %s Push Request[%s]", hostName, streamId) { + @Override public void execute() { + boolean cancel = pushObserver.onRequest(streamId, requestHeaders); + try { + if (cancel) { + frameWriter.rstStream(streamId, ErrorCode.CANCEL); + synchronized (SpdyConnection.this) { + currentPushRequests.remove(streamId); + } + } + } catch (IOException ignored) { + } + } + }); + } + + private void pushHeadersLater(final int streamId, final List
requestHeaders, + final boolean inFinished) { + executor.submit(new NamedRunnable("OkHttp %s Push Headers[%s]", hostName, streamId) { + @Override public void execute() { + boolean cancel = pushObserver.onHeaders(streamId, requestHeaders, inFinished); + try { + if (cancel) frameWriter.rstStream(streamId, ErrorCode.CANCEL); + if (cancel || inFinished) { + synchronized (SpdyConnection.this) { + currentPushRequests.remove(streamId); + } + } + } catch (IOException ignored) { + } + } + }); + } + + /** + * Eagerly reads {@code byteCount} bytes from the source before launching a background task to + * process the data. This avoids corrupting the stream. + */ + private void pushDataLater(final int streamId, final BufferedSource source, final int byteCount, + final boolean inFinished) throws IOException { + final OkBuffer buffer = new OkBuffer(); + source.require(byteCount); // Eagerly read the frame before firing client thread. + source.read(buffer, byteCount); + if (buffer.size() != byteCount) throw new IOException(buffer.size() + " != " + byteCount); + executor.submit(new NamedRunnable("OkHttp %s Push Data[%s]", hostName, streamId) { + @Override public void execute() { + try { + boolean cancel = pushObserver.onData(streamId, buffer, byteCount, inFinished); + if (cancel) frameWriter.rstStream(streamId, ErrorCode.CANCEL); + if (cancel || inFinished) { + synchronized (SpdyConnection.this) { + currentPushRequests.remove(streamId); + } + } + } catch (IOException ignored) { + } + } + }); + } + + private void pushResetLater(final int streamId, final ErrorCode errorCode) { + executor.submit(new NamedRunnable("OkHttp %s Push Reset[%s]", hostName, streamId) { + @Override public void execute() { + pushObserver.onReset(streamId, errorCode); + synchronized (SpdyConnection.this) { + currentPushRequests.remove(streamId); + } + } + }); + } }