1
0
mirror of https://github.com/square/okhttp.git synced 2026-01-21 03:41:07 +03:00

Merge pull request #566 from square/adrian.push-observer

Add HTTP/2 PushObserver
This commit is contained in:
Jesse Wilson
2014-03-02 14:55:24 -05:00
5 changed files with 354 additions and 74 deletions

View File

@@ -729,7 +729,7 @@ public final class MockWebServer {
if (headerParts.length != 2) {
throw new AssertionError("Unexpected header: " + header);
}
pushedHeaders.add(new Header(headerParts[0], headerParts[1]));
pushedHeaders.add(new Header(headerParts[0], headerParts[1].trim()));
}
String requestLine = pushPromise.getMethod() + ' ' + pushPromise.getPath() + " HTTP/1.1";
List<Integer> chunkSizes = Collections.emptyList(); // No chunked encoding for SPDY.

View File

@@ -30,7 +30,7 @@ import static com.squareup.okhttp.internal.http.OkHeaders.SELECTED_PROTOCOL;
public final class ExternalHttp2Example {
public static void main(String[] args) throws Exception {
URL url = new URL("https://twitter.com/");
URL url = new URL("https://http2.iijplus.jp/push/test1");
HttpsURLConnection connection = (HttpsURLConnection) new OkHttpClient()
.setProtocols(Protocol.HTTP2_AND_HTTP_11).open(url);
@@ -48,6 +48,7 @@ public final class ExternalHttp2Example {
if (protocolValues != null && !protocolValues.isEmpty()) {
System.out.println("PROTOCOL " + protocolValues.get(0));
}
BufferedReader reader =
new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
String line;

View File

@@ -18,10 +18,13 @@ package com.squareup.okhttp.internal.spdy;
import com.squareup.okhttp.internal.Util;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import okio.BufferedSink;
import okio.BufferedSource;
import okio.ByteString;
import okio.OkBuffer;
import okio.Okio;
@@ -53,12 +56,6 @@ import static org.junit.Assert.fail;
public final class SpdyConnectionTest {
private static final Variant SPDY3 = new Spdy3();
private static final Variant HTTP_20_DRAFT_09 = new Http20Draft09();
private static final IncomingStreamHandler REJECT_INCOMING_STREAMS = new IncomingStreamHandler() {
@Override public void receive(SpdyStream stream) throws IOException {
throw new AssertionError();
}
};
private final MockSpdyPeer peer = new MockSpdyPeer();
@After public void tearDown() throws Exception {
@@ -75,7 +72,7 @@ public final class SpdyConnectionTest {
peer.play();
// play it back
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
SpdyConnection connection = connection(peer, SPDY3);
SpdyStream stream = connection.newStream(headerEntries("b", "banana"), true, true);
assertEquals(headerEntries("a", "android"), stream.getResponseHeaders());
assertStreamData("robot", stream.getSource());
@@ -102,7 +99,7 @@ public final class SpdyConnectionTest {
peer.sendFrame().synReply(false, 1, headerEntries("b", "banana"));
peer.play();
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
SpdyConnection connection = connection(peer, SPDY3);
SpdyStream stream = connection.newStream(headerEntries("a", "android"), false, false);
assertEquals(1, connection.openStreamCount());
assertEquals(headerEntries("b", "banana"), stream.getResponseHeaders());
@@ -118,7 +115,7 @@ public final class SpdyConnectionTest {
peer.play();
// play it back
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
SpdyConnection connection = connection(peer, SPDY3);
connection.newStream(headerEntries("b", "banana"), false, true);
assertEquals(1, connection.openStreamCount());
connection.ping().roundTripTime(); // Ensure that the SYN_REPLY has been received.
@@ -133,8 +130,16 @@ public final class SpdyConnectionTest {
}
@Test public void serverCreatesStreamAndClientReplies() throws Exception {
final List<Header> pushHeaders = headerEntries(
":scheme", "https",
":host", "localhost:8888",
":method", "GET",
":path", "/index.html",
":status", "200",
":version", "HTTP/1.1",
"content-type", "text/html");
// write the mocking script
peer.sendFrame().synStream(false, false, 2, 0, 5, 129, headerEntries("a", "android"));
peer.sendFrame().synStream(false, false, 2, 0, 5, 129, pushHeaders);
peer.acceptFrame(); // SYN_REPLY
peer.play();
@@ -143,7 +148,7 @@ public final class SpdyConnectionTest {
IncomingStreamHandler handler = new IncomingStreamHandler() {
@Override public void receive(SpdyStream stream) throws IOException {
receiveCount.incrementAndGet();
assertEquals(headerEntries("a", "android"), stream.getRequestHeaders());
assertEquals(pushHeaders, stream.getRequestHeaders());
assertEquals(null, stream.getErrorCode());
assertEquals(5, stream.getPriority());
stream.reply(headerEntries("b", "banana"), true);
@@ -162,17 +167,6 @@ public final class SpdyConnectionTest {
}
@Test public void replyWithNoData() throws Exception {
MockSpdyPeer.InFrame reply = replyWithNoData(SPDY3);
assertEquals(HeadersMode.SPDY_REPLY, reply.headersMode);
}
@Test public void replyWithNoDataHttp2() throws Exception {
MockSpdyPeer.InFrame reply = replyWithNoData(HTTP_20_DRAFT_09);
assertEquals(HeadersMode.HTTP_20_HEADERS, reply.headersMode);
}
private MockSpdyPeer.InFrame replyWithNoData(Variant variant) throws Exception {
peer.setVariantAndClient(variant, false);
// write the mocking script
peer.sendFrame().synStream(false, false, 2, 0, 0, 0, headerEntries("a", "android"));
peer.acceptFrame(); // SYN_REPLY
@@ -186,7 +180,8 @@ public final class SpdyConnectionTest {
receiveCount.incrementAndGet();
}
};
connectionBuilder(peer, variant).handler(handler).build();
connectionBuilder(peer, SPDY3).handler(handler).build();
// verify the peer received what was expected
MockSpdyPeer.InFrame reply = peer.takeFrame();
@@ -194,7 +189,7 @@ public final class SpdyConnectionTest {
assertTrue(reply.inFinished);
assertEquals(headerEntries("b", "banana"), reply.headerBlock);
assertEquals(1, receiveCount.get());
return reply;
assertEquals(HeadersMode.SPDY_REPLY, reply.headersMode);
}
@Test public void serverPingsClient() throws Exception {
@@ -304,7 +299,7 @@ public final class SpdyConnectionTest {
assertTrue(ackFrame.ack);
// This stream was created *after* the connection settings were adjusted.
SpdyStream stream = connection.newStream(headerEntries("a", "android"), true, true);
SpdyStream stream = connection.newStream(headerEntries("a", "android"), false, true);
assertEquals(3368, connection.peerSettings.getInitialWindowSize());
assertEquals(1684, connection.bytesLeftInWriteWindow); // initial wasn't affected.
@@ -412,7 +407,7 @@ public final class SpdyConnectionTest {
@Test public void bogusDataFrameDoesNotDisruptConnection() throws Exception {
// write the mocking script
peer.sendFrame().data(true, 42, new OkBuffer().writeUtf8("bogus"));
peer.sendFrame().data(true, 41, new OkBuffer().writeUtf8("bogus"));
peer.acceptFrame(); // RST_STREAM
peer.sendFrame().ping(false, 2, 0);
peer.acceptFrame(); // PING
@@ -424,7 +419,7 @@ public final class SpdyConnectionTest {
// verify the peer received what was expected
MockSpdyPeer.InFrame rstStream = peer.takeFrame();
assertEquals(TYPE_RST_STREAM, rstStream.type);
assertEquals(42, rstStream.streamId);
assertEquals(41, rstStream.streamId);
assertEquals(INVALID_STREAM, rstStream.errorCode);
MockSpdyPeer.InFrame ping = peer.takeFrame();
assertEquals(2, ping.payload1);
@@ -432,7 +427,7 @@ public final class SpdyConnectionTest {
@Test public void bogusReplyFrameDoesNotDisruptConnection() throws Exception {
// write the mocking script
peer.sendFrame().synReply(false, 42, headerEntries("a", "android"));
peer.sendFrame().synReply(false, 41, headerEntries("a", "android"));
peer.acceptFrame(); // RST_STREAM
peer.sendFrame().ping(false, 2, 0);
peer.acceptFrame(); // PING
@@ -444,7 +439,7 @@ public final class SpdyConnectionTest {
// verify the peer received what was expected
MockSpdyPeer.InFrame rstStream = peer.takeFrame();
assertEquals(TYPE_RST_STREAM, rstStream.type);
assertEquals(42, rstStream.streamId);
assertEquals(41, rstStream.streamId);
assertEquals(INVALID_STREAM, rstStream.errorCode);
MockSpdyPeer.InFrame ping = peer.takeFrame();
assertEquals(2, ping.payload1);
@@ -656,7 +651,7 @@ public final class SpdyConnectionTest {
peer.play();
// play it back
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
SpdyConnection connection = connection(peer, SPDY3);
SpdyStream stream = connection.newStream(headerEntries("c", "cola"), true, true);
assertEquals(headerEntries("a", "android"), stream.getResponseHeaders());
connection.ping().roundTripTime(); // Ensure that the 2nd SYN REPLY has been received.
@@ -721,7 +716,7 @@ public final class SpdyConnectionTest {
peer.play();
// play it back
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
SpdyConnection connection = connection(peer, SPDY3);
SpdyStream stream = connection.newStream(headerEntries("b", "banana"), true, true);
assertEquals(headerEntries("a", "android"), stream.getResponseHeaders());
assertStreamData("robot", stream.getSource());
@@ -745,7 +740,7 @@ public final class SpdyConnectionTest {
peer.play();
// play it back
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
SpdyConnection connection = connection(peer, SPDY3);
SpdyStream stream = connection.newStream(headerEntries("a", "android"), true, true);
assertEquals(headerEntries("b", "banana"), stream.getResponseHeaders());
@@ -767,7 +762,7 @@ public final class SpdyConnectionTest {
peer.play();
// play it back
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
SpdyConnection connection = connection(peer, SPDY3);
SpdyStream stream = connection.newStream(headerEntries("a", "android"), true, true);
try {
stream.getResponseHeaders();
@@ -855,7 +850,7 @@ public final class SpdyConnectionTest {
peer.play();
// play it back
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
SpdyConnection connection = connection(peer, SPDY3);
connection.newStream(headerEntries("a", "android"), true, true);
Ping ping = connection.ping();
connection.shutdown(PROTOCOL_ERROR);
@@ -879,7 +874,7 @@ public final class SpdyConnectionTest {
peer.play();
// play it back
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
SpdyConnection connection = connection(peer, SPDY3);
connection.shutdown(INTERNAL_ERROR);
try {
connection.ping();
@@ -902,7 +897,7 @@ public final class SpdyConnectionTest {
peer.play();
// play it back
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
SpdyConnection connection = connection(peer, SPDY3);
SpdyStream stream = connection.newStream(headerEntries("a", "android"), true, true);
assertEquals(1, connection.openStreamCount());
connection.close();
@@ -946,7 +941,7 @@ public final class SpdyConnectionTest {
peer.play();
// play it back
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
SpdyConnection connection = connection(peer, SPDY3);
Ping ping = connection.ping();
connection.close();
assertEquals(-1, ping.roundTripTime());
@@ -961,7 +956,7 @@ public final class SpdyConnectionTest {
peer.play();
// play it back
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
SpdyConnection connection = connection(peer, SPDY3);
SpdyStream stream = connection.newStream(headerEntries("b", "banana"), true, true);
stream.setReadTimeout(1000);
Source source = stream.getSource();
@@ -991,7 +986,7 @@ public final class SpdyConnectionTest {
peer.play();
// play it back
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
SpdyConnection connection = connection(peer, SPDY3);
SpdyStream stream = connection.newStream(headerEntries("b", "banana"), true, true);
connection.ping().roundTripTime(); // Ensure that the HEADERS has been received.
assertEquals(headerEntries("a", "android", "c", "c3po"), stream.getResponseHeaders());
@@ -1014,7 +1009,7 @@ public final class SpdyConnectionTest {
peer.play();
// play it back
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
SpdyConnection connection = connection(peer, SPDY3);
SpdyStream stream = connection.newStream(headerEntries("b", "banana"), true, true);
connection.ping().roundTripTime(); // Ensure that the HEADERS has been received.
try {
@@ -1168,7 +1163,7 @@ public final class SpdyConnectionTest {
peer.play();
// Play it back.
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
SpdyConnection connection = connection(peer, SPDY3);
SpdyStream stream = connection.newStream(headerEntries("b", "banana"), true, true);
BufferedSink out = Okio.buffer(stream.getSink());
out.write(new byte[INITIAL_WINDOW_SIZE]);
@@ -1210,7 +1205,7 @@ public final class SpdyConnectionTest {
peer.play();
// Play it back.
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
SpdyConnection connection = connection(peer, SPDY3);
SpdyStream stream = connection.newStream(headerEntries("a", "apple"), true, true);
BufferedSink out = Okio.buffer(stream.getSink());
out.write(new byte[INITIAL_WINDOW_SIZE]);
@@ -1255,7 +1250,7 @@ public final class SpdyConnectionTest {
peer.play();
// play it back
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
SpdyConnection connection = connection(peer, SPDY3);
SpdyStream stream = connection.newStream(headerEntries("b", "banana"), true, true);
assertEquals(headerEntries("a", "android"), stream.getResponseHeaders());
Source in = stream.getSource();
@@ -1280,7 +1275,7 @@ public final class SpdyConnectionTest {
peer.play();
// Play it back.
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
SpdyConnection connection = connection(peer, SPDY3);
SpdyStream stream1 = connection.newStream(headerEntries("a", "apple"), true, true);
BufferedSink out1 = Okio.buffer(stream1.getSink());
out1.write(new byte[INITIAL_WINDOW_SIZE]);
@@ -1397,14 +1392,93 @@ public final class SpdyConnectionTest {
peer.play();
// play it back
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
SpdyConnection connection = connection(peer, SPDY3);
SpdyStream stream = connection.newStream(headerEntries("b", "banana"), true, true);
assertEquals("a", stream.getResponseHeaders().get(0).name.utf8());
assertEquals(length, stream.getResponseHeaders().get(0).value.size());
assertStreamData("robot", stream.getSource());
}
// TODO: change this to only cancel when local settings disable push
@Test public void pushPromiseStream() throws Exception {
peer.setVariantAndClient(HTTP_20_DRAFT_09, false);
// write the mocking script
peer.acceptFrame(); // SYN_STREAM
peer.sendFrame().synReply(false, 1, headerEntries("a", "android"));
final List<Header> expectedRequestHeaders = Arrays.asList(
new Header(Header.TARGET_METHOD, "GET"),
new Header(Header.TARGET_SCHEME, "https"),
new Header(Header.TARGET_AUTHORITY, "squareup.com"),
new Header(Header.TARGET_PATH, "/cached")
);
peer.sendFrame().pushPromise(1, 2, expectedRequestHeaders);
final List<Header> expectedResponseHeaders = Arrays.asList(
new Header(Header.RESPONSE_STATUS, "200")
);
peer.sendFrame().synReply(true, 2, expectedResponseHeaders);
peer.sendFrame().data(true, 1, data(0));
peer.play();
final List events = new ArrayList();
PushObserver observer = new PushObserver() {
@Override public boolean onRequest(int streamId, List<Header> requestHeaders) {
assertEquals(2, streamId);
events.add(requestHeaders);
return false;
}
@Override public boolean onHeaders(int streamId, List<Header> responseHeaders, boolean last) {
assertEquals(2, streamId);
assertTrue(last);
events.add(responseHeaders);
return false;
}
@Override public boolean onData(int streamId, BufferedSource source, int byteCount,
boolean last) throws IOException {
events.add(new AssertionError("onData"));
return false;
}
@Override public void onReset(int streamId, ErrorCode errorCode) {
events.add(new AssertionError("onReset"));
}
};
// play it back
SpdyConnection connection = connectionBuilder(peer, HTTP_20_DRAFT_09)
.pushObserver(observer).build();
SpdyStream client = connection.newStream(headerEntries("b", "banana"), false, true);
assertEquals(-1, client.getSource().read(new OkBuffer(), 1));
// verify the peer received what was expected
assertEquals(TYPE_HEADERS, peer.takeFrame().type);
assertEquals(2, events.size());
assertEquals(expectedRequestHeaders, events.get(0));
assertEquals(expectedResponseHeaders, events.get(1));
}
@Test public void doublePushPromise() throws Exception {
peer.setVariantAndClient(HTTP_20_DRAFT_09, false);
// write the mocking script
peer.sendFrame().pushPromise(1,2, headerEntries("a", "android"));
peer.acceptFrame(); // SYN_REPLY
peer.sendFrame().pushPromise(1, 2, headerEntries("b", "banana"));
peer.acceptFrame(); // RST_STREAM
peer.play();
// play it back
SpdyConnection connection = connectionBuilder(peer, HTTP_20_DRAFT_09).build();
connection.newStream(headerEntries("b", "banana"), false, true);
// verify the peer received what was expected
assertEquals(TYPE_HEADERS, peer.takeFrame().type);
assertEquals(PROTOCOL_ERROR, peer.takeFrame().errorCode);
}
@Test public void pushPromiseStreamsAutomaticallyCancel() throws Exception {
peer.setVariantAndClient(HTTP_20_DRAFT_09, false);
@@ -1422,7 +1496,8 @@ public final class SpdyConnectionTest {
peer.play();
// play it back
connection(peer, HTTP_20_DRAFT_09);
connectionBuilder(peer, HTTP_20_DRAFT_09)
.pushObserver(PushObserver.CANCEL).build();
// verify the peer received what was expected
MockSpdyPeer.InFrame rstStream = peer.takeFrame();
@@ -1450,12 +1525,14 @@ public final class SpdyConnectionTest {
}
private SpdyConnection connection(MockSpdyPeer peer, Variant variant) throws IOException {
return connectionBuilder(peer, variant).handler(REJECT_INCOMING_STREAMS).build();
return connectionBuilder(peer, variant).build();
}
private SpdyConnection.Builder connectionBuilder(MockSpdyPeer peer, Variant variant)
throws IOException {
return new SpdyConnection.Builder(true, peer.openSocket()).protocol(variant.getProtocol());
return new SpdyConnection.Builder(true, peer.openSocket())
.pushObserver(IGNORE)
.protocol(variant.getProtocol());
}
private void assertStreamData(String expected, Source source) throws IOException {
@@ -1493,4 +1570,24 @@ public final class SpdyConnectionTest {
static int roundUp(int num, int divisor) {
return (num + divisor - 1) / divisor;
}
static final PushObserver IGNORE = new PushObserver() {
@Override public boolean onRequest(int streamId, List<Header> requestHeaders) {
return false;
}
@Override public boolean onHeaders(int streamId, List<Header> responseHeaders, boolean last) {
return false;
}
@Override public boolean onData(int streamId, BufferedSource source, int byteCount,
boolean last) throws IOException {
source.skip(byteCount);
return false;
}
@Override public void onReset(int streamId, ErrorCode errorCode) {
}
};
}

View File

@@ -0,0 +1,85 @@
/*
* Copyright (C) 2014 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.squareup.okhttp.internal.spdy;
import java.io.IOException;
import java.util.List;
import okio.BufferedSource;
/**
* {@link com.squareup.okhttp.Protocol#HTTP_2 HTTP/2} only.
* Processes server-initiated HTTP requestd on the client.
*
* <p>Use the stream ID to correlate response headers and data.
*
* <p>Return true to request cancellation of a pushed stream. Note that this
* does not guarantee future frames won't arrive on the stream ID.
*/
public interface PushObserver {
/**
* Describes the request that the server intends to push a response for.
*
* @param streamId server-initiated stream ID: an even number.
* @param requestHeaders minimally includes {@code :method}, {@code :scheme},
* {@code :authority}, and (@code :path}.
*/
boolean onRequest(int streamId, List<Header> requestHeaders);
/**
* The response headers corresponding to a pushed request. When {@code last}
* is true, there are no data frames to follow.
*
* @param streamId server-initiated stream ID: an even number.
* @param responseHeaders minimally includes {@code :status}.
* @param last when true, there is no response data.
*/
boolean onHeaders(int streamId, List<Header> responseHeaders, boolean last);
/**
* A chunk of response data corresponding to a pushed request. This data
* must either be read or skipped.
*
* @param streamId server-initiated stream ID: an even number.
* @param source location of data corresponding with this stream ID.
* @param byteCount number of bytes to read or skip from the source.
* @param last when true, there are no data frames to follow.
*/
boolean onData(int streamId, BufferedSource source, int byteCount, boolean last)
throws IOException;
/** Indicates the reason why this stream was cancelled. */
void onReset(int streamId, ErrorCode errorCode);
PushObserver CANCEL = new PushObserver() {
@Override public boolean onRequest(int streamId, List<Header> requestHeaders) {
return true;
}
@Override public boolean onHeaders(int streamId, List<Header> responseHeaders, boolean last) {
return true;
}
@Override public boolean onData(int streamId, BufferedSource source, int byteCount,
boolean last) throws IOException {
source.skip(byteCount);
return true;
}
@Override public void onReset(int streamId, ErrorCode errorCode) {
}
};
}

View File

@@ -24,8 +24,10 @@ import java.io.InterruptedIOException;
import java.net.Socket;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -83,6 +85,8 @@ public final class SpdyConnection implements Closeable {
/** Lazily-created map of in-flight pings awaiting a response. Guarded by this. */
private Map<Integer, Ping> pings;
/** User code to run in response to push promise events. */
private final PushObserver pushObserver;
private int nextPingId;
static final int INITIAL_WINDOW_SIZE = 65535;
@@ -123,6 +127,7 @@ public final class SpdyConnection implements Closeable {
private SpdyConnection(Builder builder) {
protocol = builder.protocol;
pushObserver = builder.pushObserver;
client = builder.client;
handler = builder.handler;
nextStreamId = builder.client ? 1 : 2;
@@ -157,7 +162,7 @@ public final class SpdyConnection implements Closeable {
/** The protocol as selected using NPN or ALPN. */
public Protocol getProtocol() {
return protocol;
return protocol;
}
/**
@@ -208,6 +213,7 @@ public final class SpdyConnection implements Closeable {
public SpdyStream pushStream(int associatedStreamId, List<Header> requestHeaders, boolean out)
throws IOException {
if (client) throw new IllegalStateException("Client cannot push requests.");
if (protocol != Protocol.HTTP_2) throw new IllegalStateException("protocol != HTTP_2");
return newStream(associatedStreamId, requestHeaders, out, false);
}
@@ -219,14 +225,13 @@ public final class SpdyConnection implements Closeable {
* @param in true to create an input stream that the remote peer can use to send data to us.
* Corresponds to {@code FLAG_UNIDIRECTIONAL}.
*/
public SpdyStream newStream(List<Header> requestHeaders, boolean out,
boolean in) throws IOException {
public SpdyStream newStream(List<Header> requestHeaders, boolean out, boolean in)
throws IOException {
return newStream(0, requestHeaders, out, in);
}
private SpdyStream newStream(int associatedStreamId, List<Header> requestHeaders, boolean out,
boolean in)
throws IOException {
boolean in) throws IOException {
boolean outFinished = !out;
boolean inFinished = !in;
int priority = -1; // TODO: permit the caller to specify a priority?
@@ -250,7 +255,8 @@ public final class SpdyConnection implements Closeable {
if (associatedStreamId == 0) {
frameWriter.synStream(outFinished, inFinished, streamId, associatedStreamId, priority, slot,
requestHeaders);
} else {
} else { // HTTP/2 has a PUSH_PROMISE frame.
if (client) throw new IOException("Client attempted to push stream: " + associatedStreamId);
frameWriter.pushPromise(associatedStreamId, streamId, requestHeaders);
}
}
@@ -491,6 +497,7 @@ public final class SpdyConnection implements Closeable {
private BufferedSink sink;
private IncomingStreamHandler handler = IncomingStreamHandler.REFUSE_INCOMING_STREAMS;
private Protocol protocol = Protocol.SPDY_3;
private PushObserver pushObserver = PushObserver.CANCEL;
private boolean client;
public Builder(boolean client, Socket socket) throws IOException {
@@ -519,6 +526,11 @@ public final class SpdyConnection implements Closeable {
return this;
}
public Builder pushObserver(PushObserver pushObserver) {
this.pushObserver = pushObserver;
return this;
}
public SpdyConnection build() {
return new SpdyConnection(this);
}
@@ -557,6 +569,10 @@ public final class SpdyConnection implements Closeable {
@Override public void data(boolean inFinished, int streamId, BufferedSource source, int length)
throws IOException {
if (pushedStream(streamId)) {
pushDataLater(streamId, source, length, inFinished);
return;
}
SpdyStream dataStream = getStream(streamId);
if (dataStream == null) {
writeSynResetLater(streamId, ErrorCode.INVALID_STREAM);
@@ -570,8 +586,11 @@ public final class SpdyConnection implements Closeable {
}
@Override public void headers(boolean outFinished, boolean inFinished, int streamId,
int associatedStreamId, int priority, List<Header> headerBlock,
HeadersMode headersMode) {
int associatedStreamId, int priority, List<Header> headerBlock, HeadersMode headersMode) {
if (pushedStream(streamId)) {
pushHeadersLater(streamId, headerBlock, inFinished);
return;
}
SpdyStream stream;
synchronized (SpdyConnection.this) {
// If we're shutdown, don't bother with this stream.
@@ -623,6 +642,10 @@ public final class SpdyConnection implements Closeable {
}
@Override public void rstStream(int streamId, ErrorCode errorCode) {
if (pushedStream(streamId)) {
pushResetLater(streamId, errorCode);
return;
}
SpdyStream rstStream = removeStream(streamId);
if (rstStream != null) {
rstStream.receiveRstStream(errorCode);
@@ -730,21 +753,95 @@ public final class SpdyConnection implements Closeable {
}
@Override
public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
throws IOException {
// TODO: Wire up properly and only cancel when local settings disable push.
cancelStreamLater(promisedStreamId);
}
private void cancelStreamLater(final int streamId) {
executor.submit(new NamedRunnable("OkHttp %s Cancelling Stream %s", hostName, streamId) {
@Override public void execute() {
try {
frameWriter.rstStream(streamId, ErrorCode.CANCEL);
} catch (IOException ignored) {
}
}
});
public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders) {
pushRequestLater(promisedStreamId, requestHeaders);
}
}
/** Even, positive numbered streams are pushed streams in HTTP/2. */
private boolean pushedStream(int streamId) {
return protocol == Protocol.HTTP_2 && streamId != 0 && (streamId & 1) == 0;
}
// Guarded by this.
private final Set<Integer> currentPushRequests = new LinkedHashSet<Integer>();
private void pushRequestLater(final int streamId, final List<Header> requestHeaders) {
synchronized (this) {
if (currentPushRequests.contains(streamId)) {
writeSynResetLater(streamId, ErrorCode.PROTOCOL_ERROR);
return;
}
currentPushRequests.add(streamId);
}
executor.submit(new NamedRunnable("OkHttp %s Push Request[%s]", hostName, streamId) {
@Override public void execute() {
boolean cancel = pushObserver.onRequest(streamId, requestHeaders);
try {
if (cancel) {
frameWriter.rstStream(streamId, ErrorCode.CANCEL);
synchronized (SpdyConnection.this) {
currentPushRequests.remove(streamId);
}
}
} catch (IOException ignored) {
}
}
});
}
private void pushHeadersLater(final int streamId, final List<Header> requestHeaders,
final boolean inFinished) {
executor.submit(new NamedRunnable("OkHttp %s Push Headers[%s]", hostName, streamId) {
@Override public void execute() {
boolean cancel = pushObserver.onHeaders(streamId, requestHeaders, inFinished);
try {
if (cancel) frameWriter.rstStream(streamId, ErrorCode.CANCEL);
if (cancel || inFinished) {
synchronized (SpdyConnection.this) {
currentPushRequests.remove(streamId);
}
}
} catch (IOException ignored) {
}
}
});
}
/**
* Eagerly reads {@code byteCount} bytes from the source before launching a background task to
* process the data. This avoids corrupting the stream.
*/
private void pushDataLater(final int streamId, final BufferedSource source, final int byteCount,
final boolean inFinished) throws IOException {
final OkBuffer buffer = new OkBuffer();
source.require(byteCount); // Eagerly read the frame before firing client thread.
source.read(buffer, byteCount);
if (buffer.size() != byteCount) throw new IOException(buffer.size() + " != " + byteCount);
executor.submit(new NamedRunnable("OkHttp %s Push Data[%s]", hostName, streamId) {
@Override public void execute() {
try {
boolean cancel = pushObserver.onData(streamId, buffer, byteCount, inFinished);
if (cancel) frameWriter.rstStream(streamId, ErrorCode.CANCEL);
if (cancel || inFinished) {
synchronized (SpdyConnection.this) {
currentPushRequests.remove(streamId);
}
}
} catch (IOException ignored) {
}
}
});
}
private void pushResetLater(final int streamId, final ErrorCode errorCode) {
executor.submit(new NamedRunnable("OkHttp %s Push Reset[%s]", hostName, streamId) {
@Override public void execute() {
pushObserver.onReset(streamId, errorCode);
synchronized (SpdyConnection.this) {
currentPushRequests.remove(streamId);
}
}
});
}
}