1
0
mirror of https://github.com/square/okhttp.git synced 2026-01-24 04:02:07 +03:00

Basic support for reading and writing http/2 push promise frames.

This commit is contained in:
Adrian Cole
2014-01-20 18:25:08 -08:00
parent 47c640a1a4
commit 4a62aef3f8
10 changed files with 221 additions and 20 deletions

View File

@@ -52,5 +52,23 @@ public interface FrameReader extends Closeable {
void goAway(int lastGoodStreamId, ErrorCode errorCode);
void windowUpdate(int streamId, int deltaWindowSize, boolean endFlowControl);
void priority(int streamId, int priority);
/**
* HTTP/2 only. Receive a push promise header block.
* <p/>
* A push promise contains all the headers that pertain to a server-initiated
* request, and a {@code promisedStreamId} to which response frames will be
* delivered. Push promise frames are sent as a part of the response to
* {@code streamId}. The {@code promisedStreamId} has a priority of one
* greater than {@code streamId}.
*
* @param streamId client-initiated stream ID. Must be an odd number.
* @param promisedStreamId server-initiated stream ID. Must be an even
* number.
* @param requestHeaders minimally includes {@code :method}, {@code :scheme},
* {@code :authority}, and (@code :path}.
*/
void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
throws IOException;
}
}

View File

@@ -26,6 +26,24 @@ public interface FrameWriter extends Closeable {
void connectionHeader() throws IOException;
void ackSettings() throws IOException;
/**
* HTTP/2 only. Send a push promise header block.
* <p/>
* A push promise contains all the headers that pertain to a server-initiated
* request, and a {@code promisedStreamId} to which response frames will be
* delivered. Push promise frames are sent as a part of the response to
* {@code streamId}. The {@code promisedStreamId} has a priority of one
* greater than {@code streamId}.
*
* @param streamId client-initiated stream ID. Must be an odd number.
* @param promisedStreamId server-initiated stream ID. Must be an even
* number.
* @param requestHeaders minimally includes {@code :method}, {@code :scheme},
* {@code :authority}, and (@code :path}.
*/
void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
throws IOException;
/** SPDY/3 only. */
void flush() throws IOException;
void synStream(boolean outFinished, boolean inFinished, int streamId, int associatedStreamId,

View File

@@ -24,7 +24,6 @@ import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.List;
@@ -40,11 +39,7 @@ public final class Http20Draft09 implements Variant {
// http://tools.ietf.org/html/draft-ietf-httpbis-http2-09#section-6.5
@Override public Settings defaultOkHttpSettings(boolean client) {
Settings settings = initialPeerSettings(client);
if (client) { // TODO: we don't yet support reading push.
settings.set(Settings.ENABLE_PUSH, 0, 0);
}
return settings;
return initialPeerSettings(client);
}
@Override public Settings initialPeerSettings(boolean client) {
@@ -57,14 +52,8 @@ public final class Http20Draft09 implements Variant {
return settings;
}
private static final byte[] CONNECTION_HEADER;
static {
try {
CONNECTION_HEADER = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new AssertionError();
}
}
private static final byte[] CONNECTION_HEADER =
"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes(Util.UTF_8);
static final int TYPE_DATA = 0x0;
static final int TYPE_HEADERS = 0x1;
@@ -79,8 +68,9 @@ public final class Http20Draft09 implements Variant {
static final int FLAG_END_STREAM = 0x1;
/** Used for headers, push-promise and continuation. */
/** Used for headers and continuation. */
static final int FLAG_END_HEADERS = 0x4;
static final int FLAG_END_PUSH_PROMISE = 0x4;
static final int FLAG_PRIORITY = 0x8;
static final int FLAG_ACK = 0x1;
static final int FLAG_END_FLOW_CONTROL = 0x1;
@@ -252,8 +242,17 @@ public final class Http20Draft09 implements Variant {
}
}
private void readPushPromise(Handler handler, int flags, int length, int streamId) {
// TODO:
private void readPushPromise(Handler handler, int flags, int length, int streamId)
throws IOException {
if (streamId == 0) {
throw ioException("PROTOCOL_ERROR: TYPE_PUSH_PROMISE streamId == 0");
}
boolean endHeaders = (flags & FLAG_END_PUSH_PROMISE) != 0;
int promisedStreamId = in.readInt() & 0x7fffffff;
List<Header> headerBlock = readHeaderBlock(length, endHeaders, streamId);
handler.pushPromise(streamId, promisedStreamId, headerBlock);
}
private void readPing(Handler handler, int flags, int length, int streamId) throws IOException {
@@ -341,6 +340,22 @@ public final class Http20Draft09 implements Variant {
headers(false, streamId, -1, headerBlock);
}
@Override
public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
throws IOException {
hpackBuffer.reset();
hpackWriter.writeHeaders(requestHeaders);
int type = TYPE_PUSH_PROMISE;
// TODO: implement CONTINUATION
int length = hpackBuffer.size();
checkFrameSize(length);
int flags = FLAG_END_HEADERS;
out.writeInt((length & 0x3fff) << 16 | (type & 0xff) << 8 | (flags & 0xff));
out.writeInt(streamId & 0x7fffffff);
out.writeInt(promisedStreamId & 0x7fffffff);
hpackBuffer.writeTo(out);
}
private void headers(boolean outFinished, int streamId, int priority,
List<Header> headerBlock) throws IOException {
hpackBuffer.reset();

View File

@@ -322,6 +322,12 @@ final class Spdy3 implements Variant {
// Do nothing: no ACK for SPDY/3 settings.
}
@Override
public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
throws IOException {
// Do nothing: no push promise for SPDY/3.
}
@Override public synchronized void connectionHeader() {
// Do nothing: no connection header for SPDY/3.
}

View File

@@ -630,5 +630,23 @@ public final class SpdyConnection implements Closeable {
@Override public void priority(int streamId, int priority) {
// TODO: honor priority.
}
@Override
public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
throws IOException {
// TODO: Wire up properly and only cancel when local settings disable push.
cancelStreamLater(promisedStreamId);
}
private void cancelStreamLater(final int streamId) {
executor.submit(new NamedRunnable("OkHttp %s Cancelling Stream %s", hostName, streamId) {
@Override public void execute() {
try {
frameWriter.rstStream(streamId, ErrorCode.CANCEL);
} catch (IOException ignored) {
}
}
});
}
}
}

View File

@@ -61,4 +61,9 @@ class BaseTestHandler implements FrameReader.Handler {
@Override public void priority(int streamId, int priority) {
fail();
}
@Override
public void pushPromise(int streamId, int associatedStreamId, List<Header> headerBlock) {
fail();
}
}

View File

@@ -19,6 +19,7 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.junit.Test;
@@ -146,6 +147,88 @@ public class Http20Draft09Test {
});
}
@Test public void pushPromise() throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputStream dataOut = new DataOutputStream(out);
final int expectedPromisedStreamId = 11;
final List<Header> pushPromise = Arrays.asList(
new Header(Header.TARGET_METHOD, "GET"),
new Header(Header.TARGET_SCHEME, "https"),
new Header(Header.TARGET_AUTHORITY, "squareup.com"),
new Header(Header.TARGET_PATH, "/")
);
{ // Write the push promise frame, specifying the associated stream ID.
byte[] headerBytes = literalHeaders(pushPromise);
dataOut.writeShort(headerBytes.length);
dataOut.write(Http20Draft09.TYPE_PUSH_PROMISE);
dataOut.write(Http20Draft09.FLAG_END_PUSH_PROMISE);
dataOut.writeInt(expectedStreamId & 0x7fffffff);
dataOut.writeInt(expectedPromisedStreamId & 0x7fffffff);
dataOut.write(headerBytes);
}
FrameReader fr = newReader(out);
// Consume the headers frame.
fr.nextFrame(new BaseTestHandler() {
@Override
public void pushPromise(int streamId, int promisedStreamId, List<Header> headerBlock) {
assertEquals(expectedStreamId, streamId);
assertEquals(expectedPromisedStreamId, promisedStreamId);
assertEquals(pushPromise, headerBlock);
}
});
}
/** Headers are compressed, then framed. */
@Test public void pushPromiseThenContinuation() throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputStream dataOut = new DataOutputStream(out);
final int expectedPromisedStreamId = 11;
final List<Header> pushPromise = Arrays.asList(
new Header(Header.TARGET_METHOD, "GET"),
new Header(Header.TARGET_SCHEME, "https"),
new Header(Header.TARGET_AUTHORITY, "squareup.com"),
new Header(Header.TARGET_PATH, "/")
);
// Decoding the first header will cross frame boundaries.
byte[] headerBlock = literalHeaders(pushPromise);
{ // Write the first headers frame.
dataOut.writeShort(headerBlock.length / 2);
dataOut.write(Http20Draft09.TYPE_PUSH_PROMISE);
dataOut.write(0); // no flags
dataOut.writeInt(expectedStreamId & 0x7fffffff);
dataOut.writeInt(expectedPromisedStreamId & 0x7fffffff);
dataOut.write(headerBlock, 0, headerBlock.length / 2);
}
{ // Write the continuation frame, specifying no more frames are expected.
dataOut.writeShort(headerBlock.length / 2);
dataOut.write(Http20Draft09.TYPE_CONTINUATION);
dataOut.write(Http20Draft09.FLAG_END_HEADERS);
dataOut.writeInt(expectedStreamId & 0x7fffffff);
dataOut.write(headerBlock, headerBlock.length / 2, headerBlock.length / 2);
}
FrameReader fr = newReader(out);
// Reading the above frames should result in a concatenated headerBlock.
fr.nextFrame(new BaseTestHandler() {
@Override
public void pushPromise(int streamId, int promisedStreamId, List<Header> headerBlock) {
assertEquals(expectedStreamId, streamId);
assertEquals(expectedPromisedStreamId, promisedStreamId);
assertEquals(pushPromise, headerBlock);
}
});
}
@Test public void readRstStreamFrame() throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputStream dataOut = new DataOutputStream(out);

View File

@@ -262,5 +262,13 @@ public final class MockSpdyPeer implements Closeable {
@Override public void priority(int streamId, int priority) {
throw new UnsupportedOperationException();
}
@Override
public void pushPromise(int streamId, int associatedStreamId, List<Header> headerBlock) {
this.type = Http20Draft09.TYPE_PUSH_PROMISE;
this.streamId = streamId;
this.associatedStreamId = associatedStreamId;
this.headerBlock = headerBlock;
}
}
}
}

View File

@@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.squareup.okhttp.internal.spdy;
import com.squareup.okhttp.Protocol;
@@ -25,6 +24,7 @@ import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
@@ -1075,6 +1075,36 @@ public final class SpdyConnectionTest {
assertStreamData("robot", stream.getInputStream());
}
// TODO: change this to only cancel when local settings disable push
@Test public void pushPromiseStreamsAutomaticallyCancel() throws Exception {
MockSpdyPeer peer = new MockSpdyPeer(Variant.HTTP_20_DRAFT_09, false);
// write the mocking script
peer.sendFrame().pushPromise(1, 2, Arrays.asList(
new Header(Header.TARGET_METHOD, "GET"),
new Header(Header.TARGET_SCHEME, "https"),
new Header(Header.TARGET_AUTHORITY, "squareup.com"),
new Header(Header.TARGET_PATH, "/cached")
));
peer.sendFrame().synReply(true, 1, Arrays.asList(
new Header(Header.RESPONSE_STATUS, "200")
));
peer.acceptFrame(); // RST_STREAM
peer.play();
// play it back
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket())
.protocol(Protocol.HTTP_2)
.handler(REJECT_INCOMING_STREAMS)
.build();
// verify the peer received what was expected
MockSpdyPeer.InFrame rstStream = peer.takeFrame();
assertEquals(TYPE_RST_STREAM, rstStream.type);
assertEquals(2, rstStream.streamId);
assertEquals(CANCEL, rstStream.errorCode);
}
private SpdyConnection sendHttp2SettingsAndCheckForAck(boolean client, Settings settings)
throws IOException, InterruptedException {
MockSpdyPeer peer = new MockSpdyPeer(Variant.HTTP_20_DRAFT_09, client);

View File

@@ -333,7 +333,7 @@ public class HttpEngine {
private boolean isRecoverable(IOException e) {
// If the problem was a CertificateException from the X509TrustManager,
// do not retry, we didn't have an abrupt server initiated exception.
// do not retry, we didn't have an abrupt server-initiated exception.
boolean sslFailure =
e instanceof SSLHandshakeException && e.getCause() instanceof CertificateException;
boolean protocolFailure = e instanceof ProtocolException;