diff --git a/src/main/java/com/squareup/okhttp/Connection.java b/src/main/java/com/squareup/okhttp/Connection.java index 1679cd520..90424ffe7 100644 --- a/src/main/java/com/squareup/okhttp/Connection.java +++ b/src/main/java/com/squareup/okhttp/Connection.java @@ -164,7 +164,7 @@ public final class Connection implements Closeable { if (modernTls && (selectedProtocol = platform.getNpnSelectedProtocol(sslSocket)) != null) { if (Arrays.equals(selectedProtocol, SPDY3)) { sslSocket.setSoTimeout(0); // SPDY timeouts are set per-stream. - spdyConnection = new SpdyConnection.Builder(true, in, out).build(); + spdyConnection = new SpdyConnection.Builder(address.getUriHost(), true, in, out).build(); } else if (!Arrays.equals(selectedProtocol, HTTP_11)) { throw new IOException( "Unexpected NPN transport " + new String(selectedProtocol, "ISO-8859-1")); diff --git a/src/main/java/com/squareup/okhttp/internal/NamedRunnable.java b/src/main/java/com/squareup/okhttp/internal/NamedRunnable.java new file mode 100644 index 000000000..ce430b27b --- /dev/null +++ b/src/main/java/com/squareup/okhttp/internal/NamedRunnable.java @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2013 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.squareup.okhttp.internal; + +/** + * Runnable implementation which always sets its thread name. + */ +public abstract class NamedRunnable implements Runnable { + private String name; + + public NamedRunnable(String name) { + this.name = name; + } + + @Override public final void run() { + String oldName = Thread.currentThread().getName(); + Thread.currentThread().setName(name); + try { + execute(); + } finally { + Thread.currentThread().setName(oldName); + } + } + + protected abstract void execute(); +} diff --git a/src/main/java/com/squareup/okhttp/internal/Util.java b/src/main/java/com/squareup/okhttp/internal/Util.java index 994a6461f..f01e091fc 100644 --- a/src/main/java/com/squareup/okhttp/internal/Util.java +++ b/src/main/java/com/squareup/okhttp/internal/Util.java @@ -189,9 +189,7 @@ public final class Util { out.write(buffer); } - /** - * Fills 'dst' with bytes from 'in', throwing EOFException if insufficient bytes are available. - */ + /** Fills 'dst' with bytes from 'in', throwing EOFException if insufficient bytes are available. */ public static void readFully(InputStream in, byte[] dst) throws IOException { readFully(in, dst, 0, dst.length); } @@ -323,14 +321,4 @@ public final class Util { } return result.toString(); } - - public static ThreadFactory newThreadFactory(final String name, final boolean daemon) { - return new ThreadFactory() { - @Override public Thread newThread(Runnable r) { - Thread result = new Thread(r, name); - result.setDaemon(daemon); - return result; - } - }; - } } diff --git a/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java b/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java index 4100b9ea3..b3e248c34 100644 --- a/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java +++ b/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java @@ -16,6 +16,7 @@ package com.squareup.okhttp.internal.spdy; +import com.squareup.okhttp.internal.NamedRunnable; import com.squareup.okhttp.internal.Util; import java.io.Closeable; import java.io.IOException; @@ -27,11 +28,12 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import static java.util.concurrent.Executors.defaultThreadFactory; + /** * A socket connection to a remote peer. A connection hosts streams which can * send and receive data. @@ -75,6 +77,10 @@ public final class SpdyConnection implements Closeable { static final int GOAWAY_PROTOCOL_ERROR = 1; static final int GOAWAY_INTERNAL_ERROR = 2; + private static final ExecutorService executor = + new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, + new SynchronousQueue(), defaultThreadFactory()); + /** True if this peer initiated the connection. */ final boolean client; @@ -83,14 +89,11 @@ public final class SpdyConnection implements Closeable { * run on the callback executor. */ private final IncomingStreamHandler handler; - private final SpdyReader spdyReader; private final SpdyWriter spdyWriter; - private final ExecutorService readExecutor; - private final ExecutorService writeExecutor; - private final ExecutorService callbackExecutor; private final Map streams = new HashMap(); + private final String hostName; private int lastGoodStreamId; private int nextStreamId; private boolean shutdown; @@ -111,17 +114,9 @@ public final class SpdyConnection implements Closeable { nextStreamId = builder.client ? 1 : 2; nextPingId = builder.client ? 1 : 2; - String prefix = builder.client ? "Spdy Client " : "Spdy Server "; - readExecutor = - new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new SynchronousQueue(), - Util.newThreadFactory(prefix + "Reader", false)); - writeExecutor = - new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue(), - Util.newThreadFactory(prefix + "Writer", false)); - callbackExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, - new SynchronousQueue(), Util.newThreadFactory(prefix + "Callbacks", false)); + hostName = builder.hostName; - readExecutor.execute(new Reader()); + new Thread(new Reader(), "Spdy Reader " + hostName).start(); } /** @@ -207,14 +202,15 @@ public final class SpdyConnection implements Closeable { } void writeSynResetLater(final int streamId, final int statusCode) { - writeExecutor.execute(new Runnable() { - @Override public void run() { - try { - writeSynReset(streamId, statusCode); - } catch (IOException ignored) { - } - } - }); + executor.submit( + new NamedRunnable(String.format("Spdy Writer %s stream %d", hostName, streamId)) { + @Override public void execute() { + try { + writeSynReset(streamId, statusCode); + } catch (IOException ignored) { + } + } + }); } void writeSynReset(int streamId, int statusCode) throws IOException { @@ -222,14 +218,15 @@ public final class SpdyConnection implements Closeable { } void writeWindowUpdateLater(final int streamId, final int deltaWindowSize) { - writeExecutor.execute(new Runnable() { - @Override public void run() { - try { - writeWindowUpdate(streamId, deltaWindowSize); - } catch (IOException ignored) { - } - } - }); + executor.submit( + new NamedRunnable(String.format("Spdy Writer %s stream %d", hostName, streamId)) { + @Override public void execute() { + try { + writeWindowUpdate(streamId, deltaWindowSize); + } catch (IOException ignored) { + } + } + }); } void writeWindowUpdate(int streamId, int deltaWindowSize) throws IOException { @@ -256,11 +253,11 @@ public final class SpdyConnection implements Closeable { return ping; } - private void writePingLater(final int id, final Ping ping) { - writeExecutor.execute(new Runnable() { - @Override public void run() { + private void writePingLater(final int streamId, final Ping ping) { + executor.submit(new NamedRunnable(String.format("Spdy Writer %s ping %d", hostName, streamId)) { + @Override public void execute() { try { - writePing(id, ping); + writePing(streamId, ping); } catch (IOException ignored) { } } @@ -290,17 +287,6 @@ public final class SpdyConnection implements Closeable { } } - private void shutdownLater(final int statusCode) { - writeExecutor.execute(new Runnable() { - @Override public void run() { - try { - shutdown(statusCode); - } catch (IOException ignored) { - } - } - }); - } - /** * Degrades this connection such that new streams can neither be created * locally, nor accepted from the remote peer. Existing streams are not @@ -372,9 +358,6 @@ public final class SpdyConnection implements Closeable { } } - writeExecutor.shutdown(); - callbackExecutor.shutdown(); - readExecutor.shutdown(); try { spdyReader.close(); } catch (IOException e) { @@ -385,28 +368,39 @@ public final class SpdyConnection implements Closeable { } catch (IOException e) { if (thrown == null) thrown = e; } + if (thrown != null) throw thrown; } public static class Builder { + private String hostName; private InputStream in; private OutputStream out; private IncomingStreamHandler handler = IncomingStreamHandler.REFUSE_INCOMING_STREAMS; public boolean client; + public Builder(boolean client, Socket socket) throws IOException { + this("", client, socket.getInputStream(), socket.getOutputStream()); + } + + public Builder(boolean client, InputStream in, OutputStream out) { + this("", client, in, out); + } + /** * @param client true if this peer initiated the connection; false if * this peer accepted the connection. */ - public Builder(boolean client, Socket socket) throws IOException { - this(client, socket.getInputStream(), socket.getOutputStream()); + public Builder(String hostName, boolean client, Socket socket) throws IOException { + this(hostName, client, socket.getInputStream(), socket.getOutputStream()); } /** * @param client true if this peer initiated the connection; false if this * peer accepted the connection. */ - public Builder(boolean client, InputStream in, OutputStream out) { + public Builder(String hostName, boolean client, InputStream in, OutputStream out) { + this.hostName = hostName; this.client = client; this.in = in; this.out = out; @@ -476,8 +470,10 @@ public final class SpdyConnection implements Closeable { removeStream(streamId); return; } - callbackExecutor.execute(new Runnable() { - @Override public void run() { + + executor.submit( + new NamedRunnable(String.format("Callback %s stream %d", hostName, streamId)) { + @Override public void execute() { try { handler.receive(synStream); } catch (IOException e) { diff --git a/src/test/java/com/squareup/okhttp/internal/spdy/MockSpdyPeer.java b/src/test/java/com/squareup/okhttp/internal/spdy/MockSpdyPeer.java index 54e058bcd..bc2088c38 100644 --- a/src/test/java/com/squareup/okhttp/internal/spdy/MockSpdyPeer.java +++ b/src/test/java/com/squareup/okhttp/internal/spdy/MockSpdyPeer.java @@ -32,6 +32,8 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import static java.util.concurrent.Executors.defaultThreadFactory; + /** Replays prerecorded outgoing frames and records incoming frames. */ public final class MockSpdyPeer implements Closeable { private int frameCount = 0; @@ -40,8 +42,7 @@ public final class MockSpdyPeer implements Closeable { private final List outFrames = new ArrayList(); private final BlockingQueue inFrames = new LinkedBlockingQueue(); private int port; - private final Executor executor = - Executors.newCachedThreadPool(Util.newThreadFactory("MockSpdyPeer", true)); + private final Executor executor = Executors.newCachedThreadPool(defaultThreadFactory()); private ServerSocket serverSocket; private Socket socket;