1
0
mirror of https://github.com/square/okhttp.git synced 2026-01-25 16:01:38 +03:00

Merge pull request #118 from square/marcelo/share_spdy_executors

Share thread executors in SpdyConnections
This commit is contained in:
Jesse Wilson
2013-02-14 13:03:39 -08:00
5 changed files with 95 additions and 70 deletions

View File

@@ -164,7 +164,7 @@ public final class Connection implements Closeable {
if (modernTls && (selectedProtocol = platform.getNpnSelectedProtocol(sslSocket)) != null) {
if (Arrays.equals(selectedProtocol, SPDY3)) {
sslSocket.setSoTimeout(0); // SPDY timeouts are set per-stream.
spdyConnection = new SpdyConnection.Builder(true, in, out).build();
spdyConnection = new SpdyConnection.Builder(address.getUriHost(), true, in, out).build();
} else if (!Arrays.equals(selectedProtocol, HTTP_11)) {
throw new IOException(
"Unexpected NPN transport " + new String(selectedProtocol, "ISO-8859-1"));

View File

@@ -0,0 +1,40 @@
/*
* Copyright (C) 2013 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.squareup.okhttp.internal;
/**
* Runnable implementation which always sets its thread name.
*/
public abstract class NamedRunnable implements Runnable {
private String name;
public NamedRunnable(String name) {
this.name = name;
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}

View File

@@ -189,9 +189,7 @@ public final class Util {
out.write(buffer);
}
/**
* Fills 'dst' with bytes from 'in', throwing EOFException if insufficient bytes are available.
*/
/** Fills 'dst' with bytes from 'in', throwing EOFException if insufficient bytes are available. */
public static void readFully(InputStream in, byte[] dst) throws IOException {
readFully(in, dst, 0, dst.length);
}
@@ -323,14 +321,4 @@ public final class Util {
}
return result.toString();
}
public static ThreadFactory newThreadFactory(final String name, final boolean daemon) {
return new ThreadFactory() {
@Override public Thread newThread(Runnable r) {
Thread result = new Thread(r, name);
result.setDaemon(daemon);
return result;
}
};
}
}

View File

@@ -16,6 +16,7 @@
package com.squareup.okhttp.internal.spdy;
import com.squareup.okhttp.internal.NamedRunnable;
import com.squareup.okhttp.internal.Util;
import java.io.Closeable;
import java.io.IOException;
@@ -27,11 +28,12 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.Executors.defaultThreadFactory;
/**
* A socket connection to a remote peer. A connection hosts streams which can
* send and receive data.
@@ -75,6 +77,10 @@ public final class SpdyConnection implements Closeable {
static final int GOAWAY_PROTOCOL_ERROR = 1;
static final int GOAWAY_INTERNAL_ERROR = 2;
private static final ExecutorService executor =
new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), defaultThreadFactory());
/** True if this peer initiated the connection. */
final boolean client;
@@ -83,14 +89,11 @@ public final class SpdyConnection implements Closeable {
* run on the callback executor.
*/
private final IncomingStreamHandler handler;
private final SpdyReader spdyReader;
private final SpdyWriter spdyWriter;
private final ExecutorService readExecutor;
private final ExecutorService writeExecutor;
private final ExecutorService callbackExecutor;
private final Map<Integer, SpdyStream> streams = new HashMap<Integer, SpdyStream>();
private final String hostName;
private int lastGoodStreamId;
private int nextStreamId;
private boolean shutdown;
@@ -111,17 +114,9 @@ public final class SpdyConnection implements Closeable {
nextStreamId = builder.client ? 1 : 2;
nextPingId = builder.client ? 1 : 2;
String prefix = builder.client ? "Spdy Client " : "Spdy Server ";
readExecutor =
new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
Util.newThreadFactory(prefix + "Reader", false));
writeExecutor =
new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
Util.newThreadFactory(prefix + "Writer", false));
callbackExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.newThreadFactory(prefix + "Callbacks", false));
hostName = builder.hostName;
readExecutor.execute(new Reader());
new Thread(new Reader(), "Spdy Reader " + hostName).start();
}
/**
@@ -207,14 +202,15 @@ public final class SpdyConnection implements Closeable {
}
void writeSynResetLater(final int streamId, final int statusCode) {
writeExecutor.execute(new Runnable() {
@Override public void run() {
try {
writeSynReset(streamId, statusCode);
} catch (IOException ignored) {
}
}
});
executor.submit(
new NamedRunnable(String.format("Spdy Writer %s stream %d", hostName, streamId)) {
@Override public void execute() {
try {
writeSynReset(streamId, statusCode);
} catch (IOException ignored) {
}
}
});
}
void writeSynReset(int streamId, int statusCode) throws IOException {
@@ -222,14 +218,15 @@ public final class SpdyConnection implements Closeable {
}
void writeWindowUpdateLater(final int streamId, final int deltaWindowSize) {
writeExecutor.execute(new Runnable() {
@Override public void run() {
try {
writeWindowUpdate(streamId, deltaWindowSize);
} catch (IOException ignored) {
}
}
});
executor.submit(
new NamedRunnable(String.format("Spdy Writer %s stream %d", hostName, streamId)) {
@Override public void execute() {
try {
writeWindowUpdate(streamId, deltaWindowSize);
} catch (IOException ignored) {
}
}
});
}
void writeWindowUpdate(int streamId, int deltaWindowSize) throws IOException {
@@ -256,11 +253,11 @@ public final class SpdyConnection implements Closeable {
return ping;
}
private void writePingLater(final int id, final Ping ping) {
writeExecutor.execute(new Runnable() {
@Override public void run() {
private void writePingLater(final int streamId, final Ping ping) {
executor.submit(new NamedRunnable(String.format("Spdy Writer %s ping %d", hostName, streamId)) {
@Override public void execute() {
try {
writePing(id, ping);
writePing(streamId, ping);
} catch (IOException ignored) {
}
}
@@ -290,17 +287,6 @@ public final class SpdyConnection implements Closeable {
}
}
private void shutdownLater(final int statusCode) {
writeExecutor.execute(new Runnable() {
@Override public void run() {
try {
shutdown(statusCode);
} catch (IOException ignored) {
}
}
});
}
/**
* Degrades this connection such that new streams can neither be created
* locally, nor accepted from the remote peer. Existing streams are not
@@ -372,9 +358,6 @@ public final class SpdyConnection implements Closeable {
}
}
writeExecutor.shutdown();
callbackExecutor.shutdown();
readExecutor.shutdown();
try {
spdyReader.close();
} catch (IOException e) {
@@ -385,28 +368,39 @@ public final class SpdyConnection implements Closeable {
} catch (IOException e) {
if (thrown == null) thrown = e;
}
if (thrown != null) throw thrown;
}
public static class Builder {
private String hostName;
private InputStream in;
private OutputStream out;
private IncomingStreamHandler handler = IncomingStreamHandler.REFUSE_INCOMING_STREAMS;
public boolean client;
public Builder(boolean client, Socket socket) throws IOException {
this("", client, socket.getInputStream(), socket.getOutputStream());
}
public Builder(boolean client, InputStream in, OutputStream out) {
this("", client, in, out);
}
/**
* @param client true if this peer initiated the connection; false if
* this peer accepted the connection.
*/
public Builder(boolean client, Socket socket) throws IOException {
this(client, socket.getInputStream(), socket.getOutputStream());
public Builder(String hostName, boolean client, Socket socket) throws IOException {
this(hostName, client, socket.getInputStream(), socket.getOutputStream());
}
/**
* @param client true if this peer initiated the connection; false if this
* peer accepted the connection.
*/
public Builder(boolean client, InputStream in, OutputStream out) {
public Builder(String hostName, boolean client, InputStream in, OutputStream out) {
this.hostName = hostName;
this.client = client;
this.in = in;
this.out = out;
@@ -476,8 +470,10 @@ public final class SpdyConnection implements Closeable {
removeStream(streamId);
return;
}
callbackExecutor.execute(new Runnable() {
@Override public void run() {
executor.submit(
new NamedRunnable(String.format("Callback %s stream %d", hostName, streamId)) {
@Override public void execute() {
try {
handler.receive(synStream);
} catch (IOException e) {

View File

@@ -32,6 +32,8 @@ import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import static java.util.concurrent.Executors.defaultThreadFactory;
/** Replays prerecorded outgoing frames and records incoming frames. */
public final class MockSpdyPeer implements Closeable {
private int frameCount = 0;
@@ -40,8 +42,7 @@ public final class MockSpdyPeer implements Closeable {
private final List<OutFrame> outFrames = new ArrayList<OutFrame>();
private final BlockingQueue<InFrame> inFrames = new LinkedBlockingQueue<InFrame>();
private int port;
private final Executor executor =
Executors.newCachedThreadPool(Util.newThreadFactory("MockSpdyPeer", true));
private final Executor executor = Executors.newCachedThreadPool(defaultThreadFactory());
private ServerSocket serverSocket;
private Socket socket;