diff --git a/src/main/java/libcore/net/http/HttpConnectionPool.java b/src/main/java/libcore/net/http/HttpConnectionPool.java index 490c98adc..2bcdc485f 100644 --- a/src/main/java/libcore/net/http/HttpConnectionPool.java +++ b/src/main/java/libcore/net/http/HttpConnectionPool.java @@ -95,7 +95,7 @@ final class HttpConnectionPool { */ public void recycle(HttpConnection connection) { if (connection.isSpdy()) { - throw new IllegalArgumentException(); + throw new IllegalArgumentException(); // TODO: just 'return' here? } Socket socket = connection.getSocket(); diff --git a/src/main/java/libcore/net/spdy/SpdyConnection.java b/src/main/java/libcore/net/spdy/SpdyConnection.java index 701c98650..4f84b9ef9 100644 --- a/src/main/java/libcore/net/spdy/SpdyConnection.java +++ b/src/main/java/libcore/net/spdy/SpdyConnection.java @@ -21,29 +21,42 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static libcore.net.spdy.Threads.newThreadFactory; /** * A socket connection to a remote peer. A connection hosts streams which can * send and receive data. + * + *

Many methods in this API are synchronous: the call is + * completed before the method returns. This is typical for Java but atypical + * for SPDY. This is motivated by exception transparency: an IOException that + * was triggered by a certain caller can be caught and handled by that caller. */ public final class SpdyConnection implements Closeable { /* - * Socket writes are guarded by this. Socket reads are unguarded but are - * only made by the reader thread. + * Internal state of this connection is guarded by 'this'. No blocking + * operations may be performed while holding this lock! + * + * Socket writes are guarded by spdyWriter. + * + * Socket reads are unguarded but are only made by the reader thread. + * + * Certain operations (like SYN_STREAM) need to synchronize on both the + * spdyWriter (to do blocking I/O) and this (to create streams). Such + * operations must synchronize on 'this' last. This ensures that we never + * wait for a blocking operation while holding 'this'. */ - // TODO: break up synchronization: Sync on SpdyWriter and 'this' independently. - // SpdyWriter: I/O. Held for a long time. - // This: state, both incoming and outgoing. Settings, pings. Not held for a long time. - static final int FLAG_FIN = 0x1; static final int FLAG_UNIDIRECTIONAL = 0x2; @@ -80,42 +93,45 @@ public final class SpdyConnection implements Closeable { static final int TYPE_HEADERS = 0x8; static final int VERSION = 2; - /** Guarded by this. */ - private int nextStreamId; - /** Guarded by this. */ - private int nextPingId; private final SpdyReader spdyReader; private final SpdyWriter spdyWriter; - private final Executor executor; + private final ExecutorService readExecutor; + private final ExecutorService writeExecutor; + private final ExecutorService callbackExecutor; + + /** + * User code to run in response to an incoming stream. Callbacks must not be + * run on the callback executor. + */ + private final IncomingStreamHandler handler; + + private final Map streams = new HashMap(); + private int nextStreamId; + + /** Lazily-created map of in-flight pings awaiting a response. Guarded by this. */ + private Map pings; + private int nextPingId; /** The maximum number of concurrent streams permitted by the peer, or -1 for no limit. */ int peerMaxConcurrentStreams; - /** - * User code to run in response to an incoming stream. This must not be run - * on the read thread, otherwise a deadlock is possible. - */ - private final IncomingStreamHandler handler; - - private final Map streams = Collections.synchronizedMap( - new HashMap()); - - /** Lazily-created map of in-flight pings awaiting a response. Guarded by this. */ - private Map pings; - private SpdyConnection(Builder builder) { - nextStreamId = builder.client ? 1 : 2; - nextPingId = builder.client ? 1 : 2; spdyReader = new SpdyReader(builder.in); spdyWriter = new SpdyWriter(builder.out); handler = builder.handler; + nextStreamId = builder.client ? 1 : 2; + nextPingId = builder.client ? 1 : 2; clearSettings(); - String name = isClient() ? "ClientReader" : "ServerReader"; - executor = builder.executor != null - ? builder.executor - : Executors.newCachedThreadPool(Threads.newThreadFactory(name, true)); - executor.execute(new Reader()); + String prefix = isClient() ? "Spdy Client " : "Spdy Server "; + readExecutor = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, + new SynchronousQueue(), newThreadFactory(prefix + "Reader", true)); + writeExecutor = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS, + new LinkedBlockingQueue(), newThreadFactory(prefix + "Writer", true)); + callbackExecutor = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, + new SynchronousQueue(), newThreadFactory(prefix + "Callbacks", true)); + + readExecutor.execute(new Reader()); } /** @@ -145,15 +161,16 @@ public final class SpdyConnection implements Closeable { } } - private SpdyStream getStream(int id) { + private synchronized SpdyStream getStream(int id) { SpdyStream stream = streams.get(id); if (stream == null) { - throw new UnsupportedOperationException("TODO " + id + "; " + streams); // TODO: rst stream + // TODO: rst stream + throw new UnsupportedOperationException("TODO " + id + "; " + streams); } return stream; } - void removeStream(int streamId) { + synchronized void removeStream(int streamId) { streams.remove(streamId); } @@ -165,42 +182,52 @@ public final class SpdyConnection implements Closeable { * @param in true to create an input stream that the remote peer can use to * send data to us. Corresponds to {@code FLAG_UNIDIRECTIONAL}. */ - public synchronized SpdyStream newStream(List requestHeaders, boolean out, boolean in) + public SpdyStream newStream(List requestHeaders, boolean out, boolean in) throws IOException { - int streamId = nextStreamId; // TODO - nextStreamId += 2; int flags = (out ? 0 : FLAG_FIN) | (in ? 0 : FLAG_UNIDIRECTIONAL); int associatedStreamId = 0; // TODO int priority = 0; // TODO + SpdyStream stream; + int streamId; - SpdyStream result = new SpdyStream(streamId, this, requestHeaders, flags); - streams.put(streamId, result); + synchronized (spdyWriter) { + synchronized (this) { + streamId = nextStreamId; + nextStreamId += 2; + stream = new SpdyStream(streamId, this, requestHeaders, flags); + streams.put(streamId, stream); + } - spdyWriter.flags = flags; - spdyWriter.id = streamId; - spdyWriter.associatedId = associatedStreamId; - spdyWriter.priority = priority; - spdyWriter.nameValueBlock = requestHeaders; - spdyWriter.synStream(); + spdyWriter.flags = flags; + spdyWriter.id = streamId; + spdyWriter.associatedId = associatedStreamId; + spdyWriter.priority = priority; + spdyWriter.nameValueBlock = requestHeaders; + spdyWriter.synStream(); + } - return result; + return stream; } - synchronized void writeSynReply(int streamId, List alternating) throws IOException { - int flags = 0; // TODO - spdyWriter.flags = flags; - spdyWriter.id = streamId; - spdyWriter.nameValueBlock = alternating; - spdyWriter.synReply(); + void writeSynReply(int streamId, List alternating) throws IOException { + synchronized (spdyWriter) { + int flags = 0; // TODO + spdyWriter.flags = flags; + spdyWriter.id = streamId; + spdyWriter.nameValueBlock = alternating; + spdyWriter.synReply(); + } } /** Writes a complete data frame. */ - synchronized void writeFrame(byte[] bytes, int offset, int length) throws IOException { - spdyWriter.out.write(bytes, offset, length); + void writeFrame(byte[] bytes, int offset, int length) throws IOException { + synchronized (spdyWriter) { + spdyWriter.out.write(bytes, offset, length); + } } void writeSynResetLater(final int streamId, final int statusCode) { - executor.execute(new Runnable() { + writeExecutor.execute(new Runnable() { @Override public void run() { try { writeSynReset(streamId, statusCode); @@ -210,33 +237,32 @@ public final class SpdyConnection implements Closeable { }); } - synchronized void writeSynReset(int streamId, int statusCode) throws IOException { - int flags = 0; // TODO - spdyWriter.flags = flags; - spdyWriter.id = streamId; - spdyWriter.statusCode = statusCode; - spdyWriter.synReset(); + void writeSynReset(int streamId, int statusCode) throws IOException { + synchronized (spdyWriter) { + int flags = 0; // TODO + spdyWriter.flags = flags; + spdyWriter.id = streamId; + spdyWriter.statusCode = statusCode; + spdyWriter.synReset(); + } } /** - * Sends a ping to the peer. Use the returned object to await the ping's - * response and observe its round trip time. + * Sends a ping frame to the peer. Use the returned object to await the + * ping's response and observe its round trip time. */ - public Ping ping() throws IOException { + public synchronized Ping ping() { Ping ping = new Ping(); - int pingId; - synchronized (this) { - if (pings == null) pings = new HashMap(); - pingId = nextPingId; - nextPingId += 2; - pings.put(pingId, ping); - } + int pingId = nextPingId; + nextPingId += 2; + if (pings == null) pings = new HashMap(); + pings.put(pingId, ping); writePingLater(pingId, ping); return ping; } - void writePingLater(final int id, final Ping ping) { - executor.execute(new Runnable() { + private void writePingLater(final int id, final Ping ping) { + writeExecutor.execute(new Runnable() { @Override public void run() { try { writePing(id, ping); @@ -246,31 +272,39 @@ public final class SpdyConnection implements Closeable { }); } - synchronized void writePing(int id, Ping ping) throws IOException { - spdyWriter.flags = 0; - spdyWriter.id = id; + private void writePing(int id, Ping ping) throws IOException { // Observe the sent time immediately before performing I/O. if (ping != null) ping.send(); - spdyWriter.ping(); + + synchronized (spdyWriter) { + spdyWriter.flags = 0; + spdyWriter.id = id; + spdyWriter.ping(); + } } - public synchronized void flush() throws IOException { - spdyWriter.out.flush(); + private synchronized Ping removePing(int id) { + return pings != null ? pings.remove(id) : null; } - @Override public synchronized void close() throws IOException { + public void flush() throws IOException { + synchronized (spdyWriter) { + spdyWriter.out.flush(); + } + } + + @Override public void close() throws IOException { // TODO: graceful close; send RST frames // TODO: close all streams to release waiting readers - if (executor instanceof ExecutorService) { - ((ExecutorService) executor).shutdown(); - } + writeExecutor.shutdown(); + readExecutor.shutdown(); + callbackExecutor.shutdown(); } public static class Builder { private InputStream in; private OutputStream out; private IncomingStreamHandler handler = IncomingStreamHandler.REFUSE_INCOMING_STREAMS; - private Executor executor; public boolean client; /** @@ -291,11 +325,6 @@ public final class SpdyConnection implements Closeable { this.out = out; } - public Builder executor(Executor executor) { - this.executor = executor; - return this; - } - public Builder handler(IncomingStreamHandler handler) { this.handler = handler; return this; @@ -337,7 +366,7 @@ public final class SpdyConnection implements Closeable { if (previous != null) { previous.close(SpdyStream.RST_PROTOCOL_ERROR); } - executor.execute(new Runnable() { + callbackExecutor.execute(new Runnable() { @Override public void run() { try { handler.receive(stream); @@ -386,10 +415,7 @@ public final class SpdyConnection implements Closeable { // Respond to a client ping if this is a server and vice versa. writePingLater(id, null); } else { - Ping ping; - synchronized (this) { - ping = pings != null ? pings.remove(id) : null; - } + Ping ping = removePing(id); if (ping != null) { ping.receive(); } diff --git a/src/main/java/libcore/net/spdy/SpdyStream.java b/src/main/java/libcore/net/spdy/SpdyStream.java index 4588a4a1b..e781dea89 100644 --- a/src/main/java/libcore/net/spdy/SpdyStream.java +++ b/src/main/java/libcore/net/spdy/SpdyStream.java @@ -139,7 +139,7 @@ public final class SpdyStream { * Sends a reply. */ // TODO: support reply with FIN - public synchronized OutputStream reply(List responseHeaders) throws IOException { + public OutputStream reply(List responseHeaders) throws IOException { if (responseHeaders == null) { throw new NullPointerException("responseHeaders == null"); } @@ -159,16 +159,19 @@ public final class SpdyStream { /** * Abnormally terminate this stream. */ - public synchronized void close(int rstStatusCode) { - // TODO: no-op if inFinished == true and outFinished == true ? - if (this.rstStatusCode != -1) { + public void close(int rstStatusCode) { + synchronized (this) { + // TODO: no-op if inFinished == true and outFinished == true ? + if (this.rstStatusCode == -1) { + return; // Already closed. + } this.rstStatusCode = rstStatusCode; inFinished = true; outFinished = true; - connection.removeStream(id); notifyAll(); connection.writeSynResetLater(id, rstStatusCode); } + connection.removeStream(id); } synchronized void receiveReply(List strings) throws IOException {