1
0
mirror of https://github.com/square/okhttp.git synced 2026-01-24 04:02:07 +03:00

Honor change in connection-level initial write window from peer settings.

This commit is contained in:
Adrian Cole
2014-01-26 11:33:49 -08:00
parent b9b5c8462d
commit d4508141fc
13 changed files with 127 additions and 58 deletions

View File

@@ -26,7 +26,7 @@ import java.util.List;
* <a href="http://tools.ietf.org/html/draft-agl-tls-nextprotoneg-04">NPN</a> or
* <a href="http://tools.ietf.org/html/draft-ietf-tls-applayerprotoneg">ALPN</a> selection.
*
* <p/>
* <p>
* <h3>Protocol vs Scheme</h3>
* Despite its name, {@link java.net.URL#getProtocol()} returns the
* {@link java.net.URI#getScheme() scheme} (http, https, etc.) of the URL, not

View File

@@ -44,7 +44,7 @@ import javax.net.ssl.SSLSocket;
* <h3>ALPN and NPN</h3>
* This class uses TLS extensions ALPN and NPN to negotiate the upgrade from
* HTTP/1.1 (the default protocol to use with TLS on port 443) to either SPDY
* or HTTP/2.0.
* or HTTP/2.
*
* <p>NPN (Next Protocol Negotiation) was developed for SPDY. It is widely
* available and we support it on both Android (4.1+) and OpenJDK 7 (via the

View File

@@ -21,7 +21,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.List;
/** Reads transport frames for SPDY/3 or HTTP/2.0. */
/** Reads transport frames for SPDY/3 or HTTP/2. */
public interface FrameReader extends Closeable {
void readConnectionHeader() throws IOException;
boolean nextFrame(Handler handler) throws IOException;
@@ -32,7 +32,7 @@ public interface FrameReader extends Closeable {
/**
* Create or update incoming headers, creating the corresponding streams
* if necessary. Frames that trigger this are SPDY SYN_STREAM, HEADERS, and
* SYN_REPLY, and HTTP/2.0 HEADERS and PUSH_PROMISE.
* SYN_REPLY, and HTTP/2 HEADERS and PUSH_PROMISE.
*
* @param outFinished true if the receiver should not send further frames.
* @param inFinished true if the sender will not send further frames.
@@ -40,23 +40,28 @@ public interface FrameReader extends Closeable {
* @param associatedStreamId the stream that triggered the sender to create
* this stream.
* @param priority or -1 for no priority. For SPDY, priorities range from 0
* (highest) thru 7 (lowest). For HTTP/2.0, priorities range from 0
* (highest) thru 7 (lowest). For HTTP/2, priorities range from 0
* (highest) thru 2^31-1 (lowest), defaulting to 2^30.
*/
void headers(boolean outFinished, boolean inFinished, int streamId, int associatedStreamId,
int priority, List<Header> headerBlock, HeadersMode headersMode);
void rstStream(int streamId, ErrorCode errorCode);
void settings(boolean clearPrevious, Settings settings);
/** HTTP/2 only. */
void ackSettings();
/** SPDY/3 only. */
void noop();
/**
* Read a connection-level ping from the peer. {@code ack} indicates this
* is a reply. Payload parameters are different between SPDY/3 and HTTP/2.
* <p/>
* <p>
* In SPDY/3, only the first {@code payload1} parameter is set. If the
* reader is a client, it is an unsigned even number. Likewise, a server
* will receive an odd number.
* <p/>
* <p>
* In HTTP/2, both {@code payload1} and {@code payload2} parameters are
* set. The data is opaque binary, and there are no rules on the content.
*/
@@ -84,7 +89,7 @@ public interface FrameReader extends Closeable {
/**
* HTTP/2 only. Receive a push promise header block.
* <p/>
* <p>
* A push promise contains all the headers that pertain to a server-initiated
* request, and a {@code promisedStreamId} to which response frames will be
* delivered. Push promise frames are sent as a part of the response to

View File

@@ -20,15 +20,15 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.List;
/** Writes transport frames for SPDY/3 or HTTP/2.0. */
/** Writes transport frames for SPDY/3 or HTTP/2. */
public interface FrameWriter extends Closeable {
/** HTTP/2.0 only. */
/** HTTP/2 only. */
void connectionHeader() throws IOException;
void ackSettings() throws IOException;
/**
* HTTP/2 only. Send a push promise header block.
* <p/>
* <p>
* A push promise contains all the headers that pertain to a server-initiated
* request, and a {@code promisedStreamId} to which response frames will be
* delivered. Push promise frames are sent as a part of the response to
@@ -68,16 +68,18 @@ public interface FrameWriter extends Closeable {
/** Write okhttp's settings to the peer. */
void settings(Settings okHttpSettings) throws IOException;
/** SPDY/3 only. */
void noop() throws IOException;
/**
* Send a connection-level ping to the peer. {@code ack} indicates this is
* a reply. Payload parameters are different between SPDY/3 and HTTP/2.
* <p/>
* <p>
* In SPDY/3, only the first {@code payload1} parameter is sent. If the
* sender is a client, it is an unsigned odd number. Likewise, a server
* will send an even number.
* <p/>
* <p>
* In HTTP/2, both {@code payload1} and {@code payload2} parameters are
* sent. The data is opaque binary, and there are no rules on the content.
*/

View File

@@ -132,7 +132,7 @@ final class HpackDraft05 {
/**
* Called by the reader when the peer sent a new header table size setting.
* <p/>
* <p>
* Evicts entries or clears the table as needed.
*/
void maxHeaderTableByteCount(int newMaxHeaderTableByteCount) {

View File

@@ -37,17 +37,6 @@ public final class Http20Draft09 implements Variant {
return Protocol.HTTP_2;
}
// http://tools.ietf.org/html/draft-ietf-httpbis-http2-09#section-6.5
static Settings defaultSettings(boolean client) {
Settings settings = new Settings();
settings.set(Settings.HEADER_TABLE_SIZE, 0, 4096);
if (client) { // client specifies whether or not it accepts push.
settings.set(Settings.ENABLE_PUSH, 0, 1);
}
settings.set(Settings.INITIAL_WINDOW_SIZE, 0, 65535);
return settings;
}
private static final byte[] CONNECTION_HEADER =
"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes(Util.UTF_8);
@@ -225,12 +214,14 @@ public final class Http20Draft09 implements Variant {
private void readSettings(Handler handler, short length, byte flags, int streamId)
throws IOException {
if (streamId != 0) throw ioException("TYPE_SETTINGS streamId != 0");
if ((flags & FLAG_ACK) != 0) {
if (length != 0) throw ioException("FRAME_SIZE_ERROR ack frame should be empty!");
handler.ackSettings();
return;
}
if (length % 8 != 0) throw ioException("TYPE_SETTINGS length %% 8 != 0: %s", length);
if (streamId != 0) throw ioException("TYPE_SETTINGS streamId != 0");
Settings settings = new Settings();
for (int i = 0; i < length; i += 8) {
int w1 = in.readInt();
@@ -348,8 +339,7 @@ public final class Http20Draft09 implements Variant {
@Override
public synchronized void pushPromise(int streamId, int promisedStreamId,
List<Header> requestHeaders)
throws IOException {
List<Header> requestHeaders) throws IOException {
hpackBuffer.reset();
hpackWriter.writeHeaders(requestHeaders);

View File

@@ -36,12 +36,6 @@ final class Spdy3 implements Variant {
return Protocol.SPDY_3;
}
static Settings defaultSettings(boolean client) {
Settings settings = new Settings();
settings.set(Settings.INITIAL_WINDOW_SIZE, 0, 65535);
return settings;
}
static final int TYPE_DATA = 0x0;
static final int TYPE_SYN_STREAM = 0x1;
static final int TYPE_SYN_REPLY = 0x2;

View File

@@ -82,20 +82,30 @@ public final class SpdyConnection implements Closeable {
private Map<Integer, Ping> pings;
private int nextPingId;
/**
* Initial window size to use for the connection and new streams. Until the
* peer sends an update, this will is initialized to {@code 65535}.
*/
int initialWindowSize = 65535;
/**
* Count of bytes that can be written on the connection before receiving a
* window update.
*/
private long bytesLeftInWriteWindow = 65535; // TODO: initialize this with settings.
// Visible for testing
long bytesLeftInWriteWindow = initialWindowSize;
// TODO: Do we want to dynamically adjust settings, or KISS and only set once?
// Settings we might send include toggling push, adjusting compression table size.
final Settings okHttpSettings;
// TODO: MWS will need to guard on this setting before attempting to push.
final Settings peerSettings;
private boolean receivedInitialPeerSettings = false;
final FrameReader frameReader;
final FrameWriter frameWriter;
// Visible for testing
final Reader readerRunnable;
final ByteArrayPool bufferPool;
private SpdyConnection(Builder builder) {
@@ -108,23 +118,22 @@ public final class SpdyConnection implements Closeable {
Variant variant;
if (protocol == Protocol.HTTP_2) {
okHttpSettings = Http20Draft09.defaultSettings(client);
variant = new Http20Draft09(); // connection-specific settings here!
variant = new Http20Draft09();
} else if (protocol == Protocol.SPDY_3) {
okHttpSettings = Spdy3.defaultSettings(client);
variant = new Spdy3(); // connection-specific settings here!
variant = new Spdy3();
} else {
throw new AssertionError(protocol);
}
okHttpSettings = new Settings();
peerSettings = new Settings();
// TODO: implement stream limit
// okHttpSettings.set(Settings.MAX_CONCURRENT_STREAMS, 0, max);
peerSettings = okHttpSettings;
bufferPool = new ByteArrayPool(peerSettings.getInitialWindowSize() * 8);
frameReader = variant.newReader(builder.in, client);
frameWriter = variant.newWriter(builder.out, client);
new Thread(new Reader()).start(); // Not a daemon thread.
readerRunnable = new Reader();
new Thread(readerRunnable).start(); // Not a daemon thread.
}
/** The protocol as selected using NPN or ALPN. */
@@ -195,7 +204,7 @@ public final class SpdyConnection implements Closeable {
streamId = nextStreamId;
nextStreamId += 2;
stream = new SpdyStream(
streamId, this, outFinished, inFinished, priority, requestHeaders, peerSettings);
streamId, this, outFinished, inFinished, priority, requestHeaders, initialWindowSize);
if (stream.isOpen()) {
streams.put(streamId, stream);
setIdle(false);
@@ -234,6 +243,15 @@ public final class SpdyConnection implements Closeable {
}
}
/**
* {@code delta} will be negative if a settings frame initial window is
* smaller than the last.
*/
void addBytesToWriteWindow(long delta) {
bytesLeftInWriteWindow += delta;
if (delta > 0) SpdyConnection.this.notifyAll();
}
void writeSynResetLater(final int streamId, final ErrorCode errorCode) {
executor.submit(new NamedRunnable("OkHttp %s stream %d", hostName, streamId) {
@Override public void execute() {
@@ -468,7 +486,7 @@ public final class SpdyConnection implements Closeable {
}
}
private class Reader extends NamedRunnable implements FrameReader.Handler {
class Reader extends NamedRunnable implements FrameReader.Handler {
private Reader() {
super("OkHttp %s", hostName);
}
@@ -531,7 +549,7 @@ public final class SpdyConnection implements Closeable {
// Create a stream.
final SpdyStream newStream = new SpdyStream(streamId, SpdyConnection.this, outFinished,
inFinished, priority, headerBlock, peerSettings);
inFinished, priority, headerBlock, initialWindowSize);
lastGoodStreamId = streamId;
streams.put(streamId, newStream);
executor.submit(new NamedRunnable("OkHttp %s stream %d", hostName, streamId) {
@@ -577,6 +595,15 @@ public final class SpdyConnection implements Closeable {
if (getProtocol() == Protocol.HTTP_2) {
ackSettingsLater();
}
int peerInitialWindowSize = peerSettings.getInitialWindowSize();
if (peerInitialWindowSize != -1 && peerInitialWindowSize != initialWindowSize) {
long delta = peerInitialWindowSize - initialWindowSize;
SpdyConnection.this.initialWindowSize = peerInitialWindowSize;
if (!receivedInitialPeerSettings) {
addBytesToWriteWindow(delta);
receivedInitialPeerSettings = true;
}
}
if (!streams.isEmpty()) {
streamsToNotify = streams.values().toArray(new SpdyStream[streams.size()]);
}
@@ -607,6 +634,10 @@ public final class SpdyConnection implements Closeable {
});
}
@Override public void ackSettings() {
// TODO: If we don't get this callback after sending settings to the peer, SETTINGS_TIMEOUT.
}
@Override public void noop() {
}
@@ -646,7 +677,7 @@ public final class SpdyConnection implements Closeable {
if (streamId == 0) {
synchronized (SpdyConnection.this) {
bytesLeftInWriteWindow += windowSizeIncrement;
notifyAll();
SpdyConnection.this.notifyAll();
}
} else {
// TODO: honor endFlowControl

View File

@@ -55,7 +55,7 @@ public final class SpdyStream {
private List<Header> responseHeaders;
private final SpdyDataInputStream in;
private final SpdyDataOutputStream out;
final SpdyDataOutputStream out;
/**
* The reason why this stream was abnormally closed. If there are multiple
@@ -65,18 +65,19 @@ public final class SpdyStream {
private ErrorCode errorCode = null;
SpdyStream(int id, SpdyConnection connection, boolean outFinished, boolean inFinished,
int priority, List<Header> requestHeaders, Settings peerSettings) {
int priority, List<Header> requestHeaders, int initialWriteWindow) {
if (connection == null) throw new NullPointerException("connection == null");
if (requestHeaders == null) throw new NullPointerException("requestHeaders == null");
this.id = id;
this.connection = connection;
this.in = new SpdyDataInputStream(peerSettings.getInitialWindowSize());
this.writeWindowSize = initialWriteWindow;
this.windowUpdateThreshold = initialWriteWindow / 2;
this.in = new SpdyDataInputStream(initialWriteWindow);
this.out = new SpdyDataOutputStream();
this.in.finished = inFinished;
this.out.finished = outFinished;
this.priority = priority;
this.requestHeaders = requestHeaders;
setPeerSettings(peerSettings);
}
/**
@@ -311,11 +312,13 @@ public final class SpdyStream {
}
private void setPeerSettings(Settings peerSettings) {
// TODO: For HTTP/2.0, also adjust the stream flow control window size
// TODO: For HTTP/2, also adjust the stream flow control window size
// by the difference between the new value and the old value.
assert (Thread.holdsLock(connection)); // Because 'settings' is guarded by 'connection'.
long delta = peerSettings.getInitialWindowSize() - writeWindowSize;
this.writeWindowSize = peerSettings.getInitialWindowSize();
this.windowUpdateThreshold = peerSettings.getInitialWindowSize() / 2;
receiveWindowUpdate(delta);
}
/** Notification received when peer settings change. */

View File

@@ -41,6 +41,10 @@ class BaseTestHandler implements FrameReader.Handler {
fail();
}
@Override public void ackSettings() {
fail();
}
@Override public void noop() {
fail();
}

View File

@@ -210,6 +210,12 @@ public final class MockSpdyPeer implements Closeable {
this.settings = settings;
}
@Override public void ackSettings() {
if (this.type != -1) throw new IllegalStateException();
this.type = Spdy3.TYPE_SETTINGS;
this.ack = true;
}
@Override public void headers(boolean outFinished, boolean inFinished, int streamId,
int associatedStreamId, int priority, List<Header> headerBlock,
HeadersMode headersMode) {

View File

@@ -290,6 +290,41 @@ public final class SpdyConnectionTest {
assertFalse(pingFrame.ack);
}
@Test public void peerHttp2ServerLowersInitialWindowSize() throws Exception {
boolean client = false; // Peer is server, so we are client.
Settings initial = new Settings();
initial.set(Settings.INITIAL_WINDOW_SIZE, PERSIST_VALUE, 1684);
Settings shouldntImpactConnection = new Settings();
shouldntImpactConnection.set(Settings.INITIAL_WINDOW_SIZE, PERSIST_VALUE, 3368);
MockSpdyPeer peer = new MockSpdyPeer(HTTP_20_DRAFT_09, client);
peer.sendFrame().settings(initial);
peer.acceptFrame(); // ACK
peer.sendFrame().settings(shouldntImpactConnection);
peer.acceptFrame(); // ACK 2
peer.acceptFrame(); // HEADERS
peer.play();
SpdyConnection connection = connection(peer, HTTP_20_DRAFT_09);
// verify the peer received the ACK
MockSpdyPeer.InFrame ackFrame = peer.takeFrame();
assertEquals(TYPE_SETTINGS, ackFrame.type);
assertEquals(0, ackFrame.streamId);
assertTrue(ackFrame.ack);
ackFrame = peer.takeFrame();
assertEquals(TYPE_SETTINGS, ackFrame.type);
assertEquals(0, ackFrame.streamId);
assertTrue(ackFrame.ack);
// This stream was created *after* the connection settings were adjusted.
SpdyStream stream = connection.newStream(headerEntries("a", "android"), true, true);
assertEquals(3368, connection.initialWindowSize);
assertEquals(1684, connection.bytesLeftInWriteWindow); // initial wasn't affected.
assertEquals(1684, stream.windowUpdateThreshold);
}
@Test public void unexpectedPingIsNotReturned() throws Exception {
// write the mocking script
peer.sendFrame().ping(false, 2, 0);
@@ -327,7 +362,7 @@ public final class SpdyConnectionTest {
@Test public void peerHttp2ClientDisablesPush() throws Exception {
boolean client = false; // Peer is client, so we are server.
Settings settings = Http20Draft09.defaultSettings(client);
Settings settings = new Settings();
settings.set(Settings.ENABLE_PUSH, 0, 0); // The peer client disables push.
SpdyConnection connection = sendHttp2SettingsAndCheckForAck(client, settings);
@@ -1276,11 +1311,10 @@ public final class SpdyConnectionTest {
SpdyConnection connection = connection(peer, HTTP_20_DRAFT_09);
// verify the peer received the ACK
MockSpdyPeer.InFrame pingFrame = peer.takeFrame();
assertEquals(TYPE_SETTINGS, pingFrame.type);
assertEquals(0, pingFrame.streamId);
// TODO: check for ACK flag.
assertEquals(0, pingFrame.settings.size());
MockSpdyPeer.InFrame ackFrame = peer.takeFrame();
assertEquals(TYPE_SETTINGS, ackFrame.type);
assertEquals(0, ackFrame.streamId);
assertTrue(ackFrame.ack);
peer.close();
return connection;
}

View File

@@ -191,7 +191,7 @@ public final class Response {
/**
* Returns true if further data from this response body should be read at
* this time. For asynchronous protocols like SPDY and HTTP/2.0, this will
* this time. For asynchronous protocols like SPDY and HTTP/2, this will
* return false once all locally-available body bytes have been read.
*
* <p>Clients with many concurrent downloads can use this method to reduce