From 7a7ad1cdb22ed381c468ee38d757da03b4b1877a Mon Sep 17 00:00:00 2001 From: Jesse Wilson Date: Fri, 21 Sep 2012 17:28:07 -0400 Subject: [PATCH] Handle incoming SETTINGS frames in SPDY. --- .../java/libcore/net/spdy/SpdyConnection.java | 66 ++++++++++++++++++- .../java/libcore/net/spdy/SpdyReader.java | 5 +- .../java/libcore/net/spdy/SpdyWriter.java | 34 ++++++++++ src/main/java/libcore/net/spdy/Threads.java | 6 +- .../java/libcore/net/spdy/MockSpdyPeer.java | 2 +- .../libcore/net/spdy/SpdyConnectionTest.java | 29 ++++++++ 6 files changed, 135 insertions(+), 7 deletions(-) diff --git a/src/main/java/libcore/net/spdy/SpdyConnection.java b/src/main/java/libcore/net/spdy/SpdyConnection.java index 44194f125..9dd08e45f 100644 --- a/src/main/java/libcore/net/spdy/SpdyConnection.java +++ b/src/main/java/libcore/net/spdy/SpdyConnection.java @@ -43,6 +43,27 @@ public final class SpdyConnection implements Closeable { static final int FLAG_FIN = 0x01; static final int FLAG_UNIDIRECTIONAL = 0x02; + /** Peer request to clear durable settings. */ + static final int FLAG_SETTINGS_CLEAR_PREVIOUSLY_PERSISTED_SETTINGS = 0x01; + /** Sent by servers only. The peer requests this setting persisted for future connections. */ + static final int FLAG_SETTINGS_PERSIST_VALUE = 0x1; + /** Sent by clients only. The client is reminding the server of a persisted value. */ + static final int FLAG_SETTINGS_PERSISTED = 0x2; + /** Sender's estimate of max incoming kbps. */ + static final int SETTINGS_UPLOAD_BANDWIDTH = 0x01; + /** Sender's estimate of max outgoing kbps. */ + static final int SETTINGS_DOWNLOAD_BANDWIDTH = 0x02; + /** Sender's estimate of milliseconds between sending a request and receiving a response. */ + static final int SETTINGS_ROUND_TRIP_TIME = 0x03; + /** Sender's maximum number of concurrent streams. */ + static final int SETTINGS_MAX_CONCURRENT_STREAMS = 0x04; + /** Current CWND in Packets. */ + static final int SETTINGS_CURRENT_CWND = 0x05; + /** Retransmission rate. Percentage */ + static final int SETTINGS_DOWNLOAD_RETRANS_RATE = 0x06; + /** Window size in bytes. */ + static final int SETTINGS_INITIAL_WINDOW_SIZE = 0x07; + static final int TYPE_EOF = -1; static final int TYPE_DATA = 0x00; static final int TYPE_SYN_STREAM = 0x01; @@ -61,6 +82,9 @@ public final class SpdyConnection implements Closeable { private final SpdyWriter spdyWriter; private final Executor executor; + /** The maximum number of concurrent streams permitted by the peer, or -1 for no limit. */ + int peerMaxConcurrentStreams; + /** * User code to run in response to an incoming stream. This must not be run * on the read thread, otherwise a deadlock is possible. @@ -75,11 +99,12 @@ public final class SpdyConnection implements Closeable { spdyReader = new SpdyReader(builder.in); spdyWriter = new SpdyWriter(builder.out); handler = builder.handler; + clearSettings(); String name = isClient() ? "ClientReader" : "ServerReader"; executor = builder.executor != null ? builder.executor - : Executors.newCachedThreadPool(Threads.newThreadFactory(name)); + : Executors.newCachedThreadPool(Threads.newThreadFactory(name, true)); executor.execute(new Reader()); } @@ -90,6 +115,26 @@ public final class SpdyConnection implements Closeable { return nextStreamId % 2 == 1; } + /** + * Resets this connection's settings to their default values. + */ + private synchronized void clearSettings() { + peerMaxConcurrentStreams = -1; + } + + /** + * Receive an incoming setting from a peer. This SPDY client doesn't care + * about most settings, and so it doesn't save them. + * https://github.com/square/okhttp/issues/32 + */ + private synchronized void receiveSetting(int id, int idFlags, int value) { + switch (id) { + case SETTINGS_MAX_CONCURRENT_STREAMS: + peerMaxConcurrentStreams = value; + break; + } + } + private SpdyStream getStream(int id) { SpdyStream stream = streams.get(id); if (stream == null) { @@ -264,8 +309,23 @@ public final class SpdyConnection implements Closeable { return true; case SpdyConnection.TYPE_SETTINGS: - // TODO: implement - System.out.println("Unimplemented TYPE_SETTINGS frame discarded"); + int numberOfEntries = spdyReader.in.readInt(); + if (spdyReader.length != 4 + numberOfEntries * 8) { + // TODO: DIE + } + if ((spdyReader.flags & FLAG_SETTINGS_CLEAR_PREVIOUSLY_PERSISTED_SETTINGS) != 0) { + clearSettings(); + } + for (int i = 0; i < numberOfEntries; i++) { + int w1 = spdyReader.in.readInt(); + int value = spdyReader.in.readInt(); + // The ID is a 24 bit little-endian value, so 0xabcdefxx becomes 0x00efcdab. + int id = ((w1 & 0xff000000) >>> 24) + | ((w1 & 0xff0000) >>> 8) + | ((w1 & 0xff00) << 8); + int idFlags = (w1 & 0xff); + receiveSetting(id, idFlags, value); + } return true; case SpdyConnection.TYPE_NOOP: diff --git a/src/main/java/libcore/net/spdy/SpdyReader.java b/src/main/java/libcore/net/spdy/SpdyReader.java index 38557c95b..8e619895d 100644 --- a/src/main/java/libcore/net/spdy/SpdyReader.java +++ b/src/main/java/libcore/net/spdy/SpdyReader.java @@ -112,6 +112,9 @@ final class SpdyReader { readSynReset(); return SpdyConnection.TYPE_RST_STREAM; + case SpdyConnection.TYPE_SETTINGS: + return SpdyConnection.TYPE_SETTINGS; + default: readControlFrame(); return type; @@ -128,7 +131,7 @@ final class SpdyReader { int s3 = in.readShort(); streamId = w1 & 0x7fffffff; associatedStreamId = w2 & 0x7fffffff; - priority = s3 & 0xc000 >> 14; + priority = s3 & 0xc000 >>> 14; // int unused = s3 & 0x3fff; nameValueBlock = readNameValueBlock(length - 10); } diff --git a/src/main/java/libcore/net/spdy/SpdyWriter.java b/src/main/java/libcore/net/spdy/SpdyWriter.java index 5bc46449e..c234a74c7 100644 --- a/src/main/java/libcore/net/spdy/SpdyWriter.java +++ b/src/main/java/libcore/net/spdy/SpdyWriter.java @@ -38,6 +38,7 @@ final class SpdyWriter { public List nameValueBlock; private final ByteArrayOutputStream nameValueBlockBuffer; private final DataOutputStream nameValueBlockOut; + private int settingsRemaining = 0; SpdyWriter(OutputStream out) { this.out = new DataOutputStream(out); @@ -85,6 +86,7 @@ final class SpdyWriter { out.writeInt((flags & 0xff) << 24 | length & 0xffffff); out.writeInt(streamId & 0x7fffffff); out.writeInt(statusCode); + out.flush(); } public void data(byte[] data) throws IOException { @@ -105,4 +107,36 @@ final class SpdyWriter { } nameValueBlockOut.flush(); } + + /** + * Begins a settings frame with {@code numberOfEntries} settings. Calls to + * this method must be followed by {@code numberOfEntries} + * calls to {@link #setting}. + */ + public void settings(int numberOfEntries) throws IOException { + if (settingsRemaining != 0) throw new IllegalStateException(); + settingsRemaining = numberOfEntries; + int type = SpdyConnection.TYPE_SETTINGS; + int length = 4 + numberOfEntries * 8; + out.writeInt(0x80000000 | (SpdyConnection.VERSION & 0x7fff) << 16 | type & 0xffff); + out.writeInt((flags & 0xff) << 24 | length & 0xffffff); + out.writeInt(numberOfEntries); + } + + /** + * Writes a single setting. Must be preceded by a call to {@link #settings}. + */ + public void setting(int settingId, int settingFlag, int value) throws IOException { + if (settingsRemaining < 1) throw new IllegalStateException(); + settingsRemaining--; + // settingId 0x00efcdab and settingFlag 0x12 combine to 0xabcdef12. + out.writeInt(((settingId & 0xff0000) >>> 8) + | ((settingId & 0xff00) << 8) + | ((settingId & 0xff) << 24) + | (settingFlag & 0xff)); + out.writeInt(value); + if (settingsRemaining == 0) { + out.flush(); + } + } } diff --git a/src/main/java/libcore/net/spdy/Threads.java b/src/main/java/libcore/net/spdy/Threads.java index a1fbf67de..9e257f3b3 100644 --- a/src/main/java/libcore/net/spdy/Threads.java +++ b/src/main/java/libcore/net/spdy/Threads.java @@ -19,10 +19,12 @@ package libcore.net.spdy; import java.util.concurrent.ThreadFactory; final class Threads { - public static ThreadFactory newThreadFactory(final String name) { + public static ThreadFactory newThreadFactory(final String name, final boolean daemon) { return new ThreadFactory() { @Override public Thread newThread(Runnable r) { - return new Thread(r, name); + Thread result = new Thread(r, name); + result.setDaemon(daemon); + return result; } }; } diff --git a/src/test/java/libcore/net/spdy/MockSpdyPeer.java b/src/test/java/libcore/net/spdy/MockSpdyPeer.java index 0ea3d867b..5066c5fa0 100644 --- a/src/test/java/libcore/net/spdy/MockSpdyPeer.java +++ b/src/test/java/libcore/net/spdy/MockSpdyPeer.java @@ -40,7 +40,7 @@ public final class MockSpdyPeer { private final BlockingQueue inFrames = new LinkedBlockingQueue(); private int port; private final Executor executor = Executors.newCachedThreadPool( - Threads.newThreadFactory("MockSpdyPeer")); + Threads.newThreadFactory("MockSpdyPeer", true)); public void acceptFrame() { frameCount++; diff --git a/src/test/java/libcore/net/spdy/SpdyConnectionTest.java b/src/test/java/libcore/net/spdy/SpdyConnectionTest.java index 09172fa4c..7e6a128b9 100644 --- a/src/test/java/libcore/net/spdy/SpdyConnectionTest.java +++ b/src/test/java/libcore/net/spdy/SpdyConnectionTest.java @@ -93,4 +93,33 @@ public final class SpdyConnectionTest extends TestCase { assertEquals(0, synStream.reader.associatedStreamId); assertEquals(Arrays.asList("b", "banana"), synStream.reader.nameValueBlock); } + + public void testServerSendsSettingsToClient() throws Exception { + // write the mocking script + SpdyWriter newStream = peer.sendFrame(); + newStream.flags = SpdyConnection.FLAG_SETTINGS_CLEAR_PREVIOUSLY_PERSISTED_SETTINGS; + newStream.settings(1); + newStream.setting(SpdyConnection.SETTINGS_MAX_CONCURRENT_STREAMS, + SpdyConnection.FLAG_SETTINGS_PERSIST_VALUE, 10); + // TODO: send a 'ping' frame. + peer.play(); + + // play it back + IncomingStreamHandler handler = new IncomingStreamHandler() { + @Override public void receive(SpdyStream stream) throws IOException { + assertEquals(Arrays.asList("a", "android"), stream.getRequestHeaders()); + assertEquals(-1, stream.getRstStatusCode()); + stream.reply(Arrays.asList("b", "banana")); + } + }; + SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()) + .handler(handler) + .build(); + + // TODO: wait for the ping response, which better and faster than this fragile sleep. + Thread.sleep(1000); + synchronized (connection) { + assertEquals(10, connection.peerMaxConcurrentStreams); + } + } }