1
0
mirror of https://github.com/square/okhttp.git synced 2026-01-27 04:22:07 +03:00

Merge pull request #37 from square/jwilson/concurrency

Be more careful about synchronization.
This commit is contained in:
Jake Wharton
2012-09-25 12:06:17 -07:00
3 changed files with 131 additions and 102 deletions

View File

@@ -95,7 +95,7 @@ final class HttpConnectionPool {
*/
public void recycle(HttpConnection connection) {
if (connection.isSpdy()) {
throw new IllegalArgumentException();
throw new IllegalArgumentException(); // TODO: just 'return' here?
}
Socket socket = connection.getSocket();

View File

@@ -21,29 +21,42 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static libcore.net.spdy.Threads.newThreadFactory;
/**
* A socket connection to a remote peer. A connection hosts streams which can
* send and receive data.
*
* <p>Many methods in this API are <strong>synchronous:</strong> the call is
* completed before the method returns. This is typical for Java but atypical
* for SPDY. This is motivated by exception transparency: an IOException that
* was triggered by a certain caller can be caught and handled by that caller.
*/
public final class SpdyConnection implements Closeable {
/*
* Socket writes are guarded by this. Socket reads are unguarded but are
* only made by the reader thread.
* Internal state of this connection is guarded by 'this'. No blocking
* operations may be performed while holding this lock!
*
* Socket writes are guarded by spdyWriter.
*
* Socket reads are unguarded but are only made by the reader thread.
*
* Certain operations (like SYN_STREAM) need to synchronize on both the
* spdyWriter (to do blocking I/O) and this (to create streams). Such
* operations must synchronize on 'this' last. This ensures that we never
* wait for a blocking operation while holding 'this'.
*/
// TODO: break up synchronization: Sync on SpdyWriter and 'this' independently.
// SpdyWriter: I/O. Held for a long time.
// This: state, both incoming and outgoing. Settings, pings. Not held for a long time.
static final int FLAG_FIN = 0x1;
static final int FLAG_UNIDIRECTIONAL = 0x2;
@@ -80,42 +93,45 @@ public final class SpdyConnection implements Closeable {
static final int TYPE_HEADERS = 0x8;
static final int VERSION = 2;
/** Guarded by this. */
private int nextStreamId;
/** Guarded by this. */
private int nextPingId;
private final SpdyReader spdyReader;
private final SpdyWriter spdyWriter;
private final Executor executor;
private final ExecutorService readExecutor;
private final ExecutorService writeExecutor;
private final ExecutorService callbackExecutor;
/**
* User code to run in response to an incoming stream. Callbacks must not be
* run on the callback executor.
*/
private final IncomingStreamHandler handler;
private final Map<Integer, SpdyStream> streams = new HashMap<Integer, SpdyStream>();
private int nextStreamId;
/** Lazily-created map of in-flight pings awaiting a response. Guarded by this. */
private Map<Integer, Ping> pings;
private int nextPingId;
/** The maximum number of concurrent streams permitted by the peer, or -1 for no limit. */
int peerMaxConcurrentStreams;
/**
* User code to run in response to an incoming stream. This must not be run
* on the read thread, otherwise a deadlock is possible.
*/
private final IncomingStreamHandler handler;
private final Map<Integer, SpdyStream> streams = Collections.synchronizedMap(
new HashMap<Integer, SpdyStream>());
/** Lazily-created map of in-flight pings awaiting a response. Guarded by this. */
private Map<Integer, Ping> pings;
private SpdyConnection(Builder builder) {
nextStreamId = builder.client ? 1 : 2;
nextPingId = builder.client ? 1 : 2;
spdyReader = new SpdyReader(builder.in);
spdyWriter = new SpdyWriter(builder.out);
handler = builder.handler;
nextStreamId = builder.client ? 1 : 2;
nextPingId = builder.client ? 1 : 2;
clearSettings();
String name = isClient() ? "ClientReader" : "ServerReader";
executor = builder.executor != null
? builder.executor
: Executors.newCachedThreadPool(Threads.newThreadFactory(name, true));
executor.execute(new Reader());
String prefix = isClient() ? "Spdy Client " : "Spdy Server ";
readExecutor = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), newThreadFactory(prefix + "Reader", true));
writeExecutor = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), newThreadFactory(prefix + "Writer", true));
callbackExecutor = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), newThreadFactory(prefix + "Callbacks", true));
readExecutor.execute(new Reader());
}
/**
@@ -145,15 +161,16 @@ public final class SpdyConnection implements Closeable {
}
}
private SpdyStream getStream(int id) {
private synchronized SpdyStream getStream(int id) {
SpdyStream stream = streams.get(id);
if (stream == null) {
throw new UnsupportedOperationException("TODO " + id + "; " + streams); // TODO: rst stream
// TODO: rst stream
throw new UnsupportedOperationException("TODO " + id + "; " + streams);
}
return stream;
}
void removeStream(int streamId) {
synchronized void removeStream(int streamId) {
streams.remove(streamId);
}
@@ -165,42 +182,52 @@ public final class SpdyConnection implements Closeable {
* @param in true to create an input stream that the remote peer can use to
* send data to us. Corresponds to {@code FLAG_UNIDIRECTIONAL}.
*/
public synchronized SpdyStream newStream(List<String> requestHeaders, boolean out, boolean in)
public SpdyStream newStream(List<String> requestHeaders, boolean out, boolean in)
throws IOException {
int streamId = nextStreamId; // TODO
nextStreamId += 2;
int flags = (out ? 0 : FLAG_FIN) | (in ? 0 : FLAG_UNIDIRECTIONAL);
int associatedStreamId = 0; // TODO
int priority = 0; // TODO
SpdyStream stream;
int streamId;
SpdyStream result = new SpdyStream(streamId, this, requestHeaders, flags);
streams.put(streamId, result);
synchronized (spdyWriter) {
synchronized (this) {
streamId = nextStreamId;
nextStreamId += 2;
stream = new SpdyStream(streamId, this, requestHeaders, flags);
streams.put(streamId, stream);
}
spdyWriter.flags = flags;
spdyWriter.id = streamId;
spdyWriter.associatedId = associatedStreamId;
spdyWriter.priority = priority;
spdyWriter.nameValueBlock = requestHeaders;
spdyWriter.synStream();
spdyWriter.flags = flags;
spdyWriter.id = streamId;
spdyWriter.associatedId = associatedStreamId;
spdyWriter.priority = priority;
spdyWriter.nameValueBlock = requestHeaders;
spdyWriter.synStream();
}
return result;
return stream;
}
synchronized void writeSynReply(int streamId, List<String> alternating) throws IOException {
int flags = 0; // TODO
spdyWriter.flags = flags;
spdyWriter.id = streamId;
spdyWriter.nameValueBlock = alternating;
spdyWriter.synReply();
void writeSynReply(int streamId, List<String> alternating) throws IOException {
synchronized (spdyWriter) {
int flags = 0; // TODO
spdyWriter.flags = flags;
spdyWriter.id = streamId;
spdyWriter.nameValueBlock = alternating;
spdyWriter.synReply();
}
}
/** Writes a complete data frame. */
synchronized void writeFrame(byte[] bytes, int offset, int length) throws IOException {
spdyWriter.out.write(bytes, offset, length);
void writeFrame(byte[] bytes, int offset, int length) throws IOException {
synchronized (spdyWriter) {
spdyWriter.out.write(bytes, offset, length);
}
}
void writeSynResetLater(final int streamId, final int statusCode) {
executor.execute(new Runnable() {
writeExecutor.execute(new Runnable() {
@Override public void run() {
try {
writeSynReset(streamId, statusCode);
@@ -210,33 +237,32 @@ public final class SpdyConnection implements Closeable {
});
}
synchronized void writeSynReset(int streamId, int statusCode) throws IOException {
int flags = 0; // TODO
spdyWriter.flags = flags;
spdyWriter.id = streamId;
spdyWriter.statusCode = statusCode;
spdyWriter.synReset();
void writeSynReset(int streamId, int statusCode) throws IOException {
synchronized (spdyWriter) {
int flags = 0; // TODO
spdyWriter.flags = flags;
spdyWriter.id = streamId;
spdyWriter.statusCode = statusCode;
spdyWriter.synReset();
}
}
/**
* Sends a ping to the peer. Use the returned object to await the ping's
* response and observe its round trip time.
* Sends a ping frame to the peer. Use the returned object to await the
* ping's response and observe its round trip time.
*/
public Ping ping() throws IOException {
public synchronized Ping ping() {
Ping ping = new Ping();
int pingId;
synchronized (this) {
if (pings == null) pings = new HashMap<Integer, Ping>();
pingId = nextPingId;
nextPingId += 2;
pings.put(pingId, ping);
}
int pingId = nextPingId;
nextPingId += 2;
if (pings == null) pings = new HashMap<Integer, Ping>();
pings.put(pingId, ping);
writePingLater(pingId, ping);
return ping;
}
void writePingLater(final int id, final Ping ping) {
executor.execute(new Runnable() {
private void writePingLater(final int id, final Ping ping) {
writeExecutor.execute(new Runnable() {
@Override public void run() {
try {
writePing(id, ping);
@@ -246,31 +272,39 @@ public final class SpdyConnection implements Closeable {
});
}
synchronized void writePing(int id, Ping ping) throws IOException {
spdyWriter.flags = 0;
spdyWriter.id = id;
private void writePing(int id, Ping ping) throws IOException {
// Observe the sent time immediately before performing I/O.
if (ping != null) ping.send();
spdyWriter.ping();
synchronized (spdyWriter) {
spdyWriter.flags = 0;
spdyWriter.id = id;
spdyWriter.ping();
}
}
public synchronized void flush() throws IOException {
spdyWriter.out.flush();
private synchronized Ping removePing(int id) {
return pings != null ? pings.remove(id) : null;
}
@Override public synchronized void close() throws IOException {
public void flush() throws IOException {
synchronized (spdyWriter) {
spdyWriter.out.flush();
}
}
@Override public void close() throws IOException {
// TODO: graceful close; send RST frames
// TODO: close all streams to release waiting readers
if (executor instanceof ExecutorService) {
((ExecutorService) executor).shutdown();
}
writeExecutor.shutdown();
readExecutor.shutdown();
callbackExecutor.shutdown();
}
public static class Builder {
private InputStream in;
private OutputStream out;
private IncomingStreamHandler handler = IncomingStreamHandler.REFUSE_INCOMING_STREAMS;
private Executor executor;
public boolean client;
/**
@@ -291,11 +325,6 @@ public final class SpdyConnection implements Closeable {
this.out = out;
}
public Builder executor(Executor executor) {
this.executor = executor;
return this;
}
public Builder handler(IncomingStreamHandler handler) {
this.handler = handler;
return this;
@@ -337,7 +366,7 @@ public final class SpdyConnection implements Closeable {
if (previous != null) {
previous.close(SpdyStream.RST_PROTOCOL_ERROR);
}
executor.execute(new Runnable() {
callbackExecutor.execute(new Runnable() {
@Override public void run() {
try {
handler.receive(stream);
@@ -386,10 +415,7 @@ public final class SpdyConnection implements Closeable {
// Respond to a client ping if this is a server and vice versa.
writePingLater(id, null);
} else {
Ping ping;
synchronized (this) {
ping = pings != null ? pings.remove(id) : null;
}
Ping ping = removePing(id);
if (ping != null) {
ping.receive();
}

View File

@@ -139,7 +139,7 @@ public final class SpdyStream {
* Sends a reply.
*/
// TODO: support reply with FIN
public synchronized OutputStream reply(List<String> responseHeaders) throws IOException {
public OutputStream reply(List<String> responseHeaders) throws IOException {
if (responseHeaders == null) {
throw new NullPointerException("responseHeaders == null");
}
@@ -159,16 +159,19 @@ public final class SpdyStream {
/**
* Abnormally terminate this stream.
*/
public synchronized void close(int rstStatusCode) {
// TODO: no-op if inFinished == true and outFinished == true ?
if (this.rstStatusCode != -1) {
public void close(int rstStatusCode) {
synchronized (this) {
// TODO: no-op if inFinished == true and outFinished == true ?
if (this.rstStatusCode == -1) {
return; // Already closed.
}
this.rstStatusCode = rstStatusCode;
inFinished = true;
outFinished = true;
connection.removeStream(id);
notifyAll();
connection.writeSynResetLater(id, rstStatusCode);
}
connection.removeStream(id);
}
synchronized void receiveReply(List<String> strings) throws IOException {