mirror of
https://github.com/square/okhttp.git
synced 2026-01-22 15:42:00 +03:00
Merge pull request #533 from square/jwilson_0215_rethink_recycle
Move connection pooling logic.
This commit is contained in:
@@ -65,7 +65,7 @@ import static java.net.HttpURLConnection.HTTP_PROXY_AUTH;
|
||||
* should the attempt fail.
|
||||
*/
|
||||
public final class Connection implements Closeable {
|
||||
|
||||
private final ConnectionPool pool;
|
||||
private final Route route;
|
||||
|
||||
private Socket socket;
|
||||
@@ -78,7 +78,8 @@ public final class Connection implements Closeable {
|
||||
private long idleStartTimeNs;
|
||||
private Handshake handshake;
|
||||
|
||||
public Connection(Route route) {
|
||||
public Connection(ConnectionPool pool, Route route) {
|
||||
this.pool = pool;
|
||||
this.route = route;
|
||||
}
|
||||
|
||||
@@ -96,7 +97,7 @@ public final class Connection implements Closeable {
|
||||
upgradeToTls(tunnelRequest);
|
||||
} else {
|
||||
streamWrapper();
|
||||
httpConnection = new HttpConnection(in, out);
|
||||
httpConnection = new HttpConnection(pool, this, in, out);
|
||||
}
|
||||
connected = true;
|
||||
}
|
||||
@@ -164,7 +165,7 @@ public final class Connection implements Closeable {
|
||||
.protocol(selectedProtocol).build();
|
||||
spdyConnection.sendConnectionHeader();
|
||||
} else {
|
||||
httpConnection = new HttpConnection(in, out);
|
||||
httpConnection = new HttpConnection(pool, this, in, out);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -306,7 +307,7 @@ public final class Connection implements Closeable {
|
||||
* retried if the proxy requires authorization.
|
||||
*/
|
||||
private void makeTunnel(TunnelRequest tunnelRequest) throws IOException {
|
||||
HttpConnection tunnelConnection = new HttpConnection(in, out);
|
||||
HttpConnection tunnelConnection = new HttpConnection(pool, this, in, out);
|
||||
Request request = tunnelRequest.getRequest();
|
||||
String requestLine = tunnelRequest.requestLine();
|
||||
while (true) {
|
||||
|
||||
@@ -244,12 +244,9 @@ public class ConnectionPool {
|
||||
* Shares the SPDY connection with the pool. Callers to this method may
|
||||
* continue to use {@code connection}.
|
||||
*/
|
||||
public void maybeShare(Connection connection) {
|
||||
public void share(Connection connection) {
|
||||
if (!connection.isSpdy()) throw new IllegalArgumentException();
|
||||
executorService.execute(connectionsCleanupRunnable);
|
||||
if (!connection.isSpdy()) {
|
||||
// Only SPDY connections are sharable.
|
||||
return;
|
||||
}
|
||||
if (connection.isAlive()) {
|
||||
synchronized (this) {
|
||||
connections.addFirst(connection);
|
||||
|
||||
@@ -80,7 +80,7 @@ final class Job extends NamedRunnable {
|
||||
.exception(e)
|
||||
.build());
|
||||
} finally {
|
||||
engine.release(true); // Release the connection if it isn't already released.
|
||||
engine.close(); // Close the connection if it isn't already.
|
||||
dispatcher.finished(this);
|
||||
}
|
||||
}
|
||||
@@ -142,7 +142,7 @@ final class Job extends NamedRunnable {
|
||||
Request redirect = processResponse(engine, response);
|
||||
|
||||
if (redirect == null) {
|
||||
engine.automaticallyReleaseConnectionToPool();
|
||||
engine.releaseConnection();
|
||||
return response.newBuilder()
|
||||
.body(new RealResponseBody(response, engine.getResponseBody()))
|
||||
.redirectedBy(redirectedBy)
|
||||
@@ -150,11 +150,10 @@ final class Job extends NamedRunnable {
|
||||
}
|
||||
|
||||
if (!sameConnection(request, redirect)) {
|
||||
engine.automaticallyReleaseConnectionToPool();
|
||||
engine.releaseConnection();
|
||||
}
|
||||
|
||||
engine.release(false);
|
||||
Connection connection = engine.getConnection();
|
||||
Connection connection = engine.close();
|
||||
redirectedBy = response.newBuilder().redirectedBy(redirectedBy).build(); // Chained.
|
||||
request = redirect;
|
||||
engine = new HttpEngine(client, request, false, connection, null, null);
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
package com.squareup.okhttp.internal.http;
|
||||
|
||||
import com.squareup.okhttp.Connection;
|
||||
import com.squareup.okhttp.ConnectionPool;
|
||||
import com.squareup.okhttp.Headers;
|
||||
import com.squareup.okhttp.Protocol;
|
||||
import com.squareup.okhttp.Response;
|
||||
@@ -61,16 +62,59 @@ public final class HttpConnection {
|
||||
private static final int STATE_READING_RESPONSE_BODY = 5;
|
||||
private static final int STATE_CLOSED = 6;
|
||||
|
||||
private static final int ON_IDLE_HOLD = 0;
|
||||
private static final int ON_IDLE_POOL = 1;
|
||||
private static final int ON_IDLE_CLOSE = 2;
|
||||
|
||||
private final ConnectionPool pool;
|
||||
private final Connection connection;
|
||||
private final InputStream in;
|
||||
private final OutputStream out;
|
||||
|
||||
private int state = STATE_IDLE;
|
||||
private int onIdle = ON_IDLE_HOLD;
|
||||
|
||||
public HttpConnection(InputStream in, OutputStream out) {
|
||||
public HttpConnection(ConnectionPool pool, Connection connection, InputStream in,
|
||||
OutputStream out) {
|
||||
this.pool = pool;
|
||||
this.connection = connection;
|
||||
this.in = in;
|
||||
this.out = out;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure this connection to put itself back into the connection pool when
|
||||
* the HTTP response body is exhausted.
|
||||
*/
|
||||
public void poolOnIdle() {
|
||||
onIdle = ON_IDLE_POOL;
|
||||
|
||||
// If we're already idle, go to the pool immediately.
|
||||
if (state == STATE_IDLE) {
|
||||
onIdle = ON_IDLE_HOLD; // Set the on idle policy back to the default.
|
||||
pool.recycle(connection);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure this connection to close itself when the HTTP response body is
|
||||
* exhausted.
|
||||
*/
|
||||
public void closeOnIdle() throws IOException {
|
||||
onIdle = ON_IDLE_CLOSE;
|
||||
|
||||
// If we're already idle, close immediately.
|
||||
if (state == STATE_IDLE) {
|
||||
state = STATE_CLOSED;
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
|
||||
/** Returns true if this connection is closed. */
|
||||
public boolean isClosed() {
|
||||
return state == STATE_CLOSED;
|
||||
}
|
||||
|
||||
public void flush() throws IOException {
|
||||
out.flush();
|
||||
}
|
||||
@@ -126,11 +170,8 @@ public final class HttpConnection {
|
||||
* cannot be cached unless it is consumed completely) or to enable connection
|
||||
* reuse.
|
||||
*/
|
||||
private static boolean discardStream(HttpEngine httpEngine, InputStream responseBodyIn) {
|
||||
Connection connection = httpEngine.getConnection();
|
||||
if (connection == null) return false;
|
||||
public boolean discard(InputStream responseBodyIn) {
|
||||
Socket socket = connection.getSocket();
|
||||
if (socket == null) return false;
|
||||
try {
|
||||
int socketTimeout = socket.getSoTimeout();
|
||||
socket.setSoTimeout(Transport.DISCARD_STREAM_TIMEOUT_MILLIS);
|
||||
@@ -163,38 +204,33 @@ public final class HttpConnection {
|
||||
requestBody.writeToSocket(out);
|
||||
}
|
||||
|
||||
public InputStream newFixedLengthInputStream(
|
||||
CacheRequest cacheRequest, HttpEngine httpEngine, long length) throws IOException {
|
||||
public InputStream newFixedLengthInputStream(CacheRequest cacheRequest, long length)
|
||||
throws IOException {
|
||||
if (state != STATE_OPEN_RESPONSE_BODY) throw new IllegalStateException("state: " + state);
|
||||
state = STATE_READING_RESPONSE_BODY;
|
||||
return new FixedLengthInputStream(in, cacheRequest, httpEngine, length);
|
||||
return new FixedLengthInputStream(cacheRequest, length);
|
||||
}
|
||||
|
||||
/**
|
||||
* Call this to advance past a response body for HTTP responses that do not
|
||||
* have a response body.
|
||||
*/
|
||||
public void emptyResponseBody() {
|
||||
if (state != STATE_OPEN_RESPONSE_BODY) throw new IllegalStateException("state: " + state);
|
||||
state = STATE_IDLE;
|
||||
public void emptyResponseBody() throws IOException {
|
||||
newFixedLengthInputStream(null, 0L); // Transition to STATE_IDLE.
|
||||
}
|
||||
|
||||
public InputStream newChunkedInputStream(
|
||||
CacheRequest cacheRequest, HttpEngine httpEngine) throws IOException {
|
||||
if (state != STATE_OPEN_RESPONSE_BODY) throw new IllegalStateException("state: " + state);
|
||||
state = STATE_READING_RESPONSE_BODY;
|
||||
return new ChunkedInputStream(in, cacheRequest, httpEngine);
|
||||
return new ChunkedInputStream(cacheRequest, httpEngine);
|
||||
}
|
||||
|
||||
public InputStream newUnknownLengthInputStream(CacheRequest cacheRequest, HttpEngine httpEngine)
|
||||
throws IOException {
|
||||
if (state != STATE_OPEN_RESPONSE_BODY) throw new IllegalStateException("state: " + state);
|
||||
state = STATE_READING_RESPONSE_BODY;
|
||||
return new UnknownLengthHttpInputStream(in, cacheRequest, httpEngine);
|
||||
}
|
||||
|
||||
public boolean discard(HttpEngine httpEngine, InputStream responseBodyIn) {
|
||||
return discardStream(httpEngine, responseBodyIn);
|
||||
return new UnknownLengthHttpInputStream(cacheRequest);
|
||||
}
|
||||
|
||||
/** An HTTP body with a fixed length known in advance. */
|
||||
@@ -347,17 +383,11 @@ public final class HttpConnection {
|
||||
}
|
||||
|
||||
private class AbstractHttpInputStream extends InputStream {
|
||||
protected final InputStream in;
|
||||
protected final HttpEngine httpEngine;
|
||||
private final CacheRequest cacheRequest;
|
||||
protected final OutputStream cacheBody;
|
||||
protected boolean closed;
|
||||
|
||||
AbstractHttpInputStream(InputStream in, HttpEngine httpEngine, CacheRequest cacheRequest)
|
||||
throws IOException {
|
||||
this.in = in;
|
||||
this.httpEngine = httpEngine;
|
||||
|
||||
AbstractHttpInputStream(CacheRequest cacheRequest) throws IOException {
|
||||
OutputStream cacheBody = cacheRequest != null ? cacheRequest.getBody() : null;
|
||||
|
||||
// Some apps return a null body; for compatibility we treat that like a null cache request.
|
||||
@@ -378,9 +408,7 @@ public final class HttpConnection {
|
||||
}
|
||||
|
||||
protected final void checkNotClosed() throws IOException {
|
||||
if (closed) {
|
||||
throw new IOException("stream closed");
|
||||
}
|
||||
if (closed) throw new IOException("stream closed");
|
||||
}
|
||||
|
||||
protected final void cacheWrite(byte[] buffer, int offset, int count) throws IOException {
|
||||
@@ -395,11 +423,19 @@ public final class HttpConnection {
|
||||
*/
|
||||
protected final void endOfInput() throws IOException {
|
||||
if (state != STATE_READING_RESPONSE_BODY) throw new IllegalStateException("state: " + state);
|
||||
|
||||
if (cacheRequest != null) {
|
||||
cacheBody.close();
|
||||
}
|
||||
httpEngine.release(false);
|
||||
|
||||
state = STATE_IDLE;
|
||||
if (onIdle == ON_IDLE_POOL) {
|
||||
onIdle = ON_IDLE_HOLD; // Set the on idle policy back to the default.
|
||||
pool.recycle(connection);
|
||||
} else if (onIdle == ON_IDLE_CLOSE) {
|
||||
state = STATE_CLOSED;
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -418,7 +454,7 @@ public final class HttpConnection {
|
||||
if (cacheRequest != null) {
|
||||
cacheRequest.abort();
|
||||
}
|
||||
httpEngine.release(true);
|
||||
Util.closeQuietly(connection);
|
||||
state = STATE_CLOSED;
|
||||
}
|
||||
}
|
||||
@@ -427,9 +463,8 @@ public final class HttpConnection {
|
||||
private class FixedLengthInputStream extends AbstractHttpInputStream {
|
||||
private long bytesRemaining;
|
||||
|
||||
public FixedLengthInputStream(InputStream is, CacheRequest cacheRequest, HttpEngine httpEngine,
|
||||
long length) throws IOException {
|
||||
super(is, httpEngine, cacheRequest);
|
||||
public FixedLengthInputStream(CacheRequest cacheRequest, long length) throws IOException {
|
||||
super(cacheRequest);
|
||||
bytesRemaining = length;
|
||||
if (bytesRemaining == 0) {
|
||||
endOfInput();
|
||||
@@ -464,7 +499,7 @@ public final class HttpConnection {
|
||||
if (closed) {
|
||||
return;
|
||||
}
|
||||
if (bytesRemaining != 0 && !discardStream(httpEngine, this)) {
|
||||
if (bytesRemaining != 0 && !discard(this)) {
|
||||
unexpectedEndOfInput();
|
||||
}
|
||||
closed = true;
|
||||
@@ -476,10 +511,11 @@ public final class HttpConnection {
|
||||
private static final int NO_CHUNK_YET = -1;
|
||||
private int bytesRemainingInChunk = NO_CHUNK_YET;
|
||||
private boolean hasMoreChunks = true;
|
||||
private final HttpEngine httpEngine;
|
||||
|
||||
ChunkedInputStream(InputStream is, CacheRequest cacheRequest, HttpEngine httpEngine)
|
||||
throws IOException {
|
||||
super(is, httpEngine, cacheRequest);
|
||||
ChunkedInputStream(CacheRequest cacheRequest, HttpEngine httpEngine) throws IOException {
|
||||
super(cacheRequest);
|
||||
this.httpEngine = httpEngine;
|
||||
}
|
||||
|
||||
@Override public int read(byte[] buffer, int offset, int count) throws IOException {
|
||||
@@ -541,7 +577,7 @@ public final class HttpConnection {
|
||||
if (closed) {
|
||||
return;
|
||||
}
|
||||
if (hasMoreChunks && !discardStream(httpEngine, this)) {
|
||||
if (hasMoreChunks && !discard(this)) {
|
||||
unexpectedEndOfInput();
|
||||
}
|
||||
closed = true;
|
||||
@@ -552,9 +588,8 @@ public final class HttpConnection {
|
||||
class UnknownLengthHttpInputStream extends AbstractHttpInputStream {
|
||||
private boolean inputExhausted;
|
||||
|
||||
UnknownLengthHttpInputStream(InputStream in, CacheRequest cacheRequest, HttpEngine httpEngine)
|
||||
throws IOException {
|
||||
super(in, httpEngine, cacheRequest);
|
||||
UnknownLengthHttpInputStream(CacheRequest cacheRequest) throws IOException {
|
||||
super(cacheRequest);
|
||||
}
|
||||
|
||||
@Override public int read(byte[] buffer, int offset, int count) throws IOException {
|
||||
|
||||
@@ -68,11 +68,6 @@ import static java.net.HttpURLConnection.HTTP_NO_CONTENT;
|
||||
*
|
||||
* <p>The request and response may be served by the HTTP response cache, by the
|
||||
* network, or by both in the event of a conditional GET.
|
||||
*
|
||||
* <p>This class may hold a socket connection that needs to be released or
|
||||
* recycled. By default, this socket connection is held when the last byte of
|
||||
* the response is consumed. To release the connection when it is no longer
|
||||
* required, use {@link #automaticallyReleaseConnectionToPool()}.
|
||||
*/
|
||||
public class HttpEngine {
|
||||
final OkHttpClient client;
|
||||
@@ -121,15 +116,6 @@ public class HttpEngine {
|
||||
/** The cache request currently being populated from a network response. */
|
||||
private CacheRequest cacheRequest;
|
||||
|
||||
/**
|
||||
* True if the socket connection should be released to the connection pool
|
||||
* when the response has been fully read.
|
||||
*/
|
||||
private boolean automaticallyReleaseConnectionToPool;
|
||||
|
||||
/** True if the socket connection is no longer needed by this engine. */
|
||||
private boolean connectionReleased;
|
||||
|
||||
/**
|
||||
* @param request the HTTP request without a body. The body must be
|
||||
* written via the engine's request body stream.
|
||||
@@ -199,9 +185,10 @@ public class HttpEngine {
|
||||
}
|
||||
|
||||
} else {
|
||||
// We're using a cached response. Close the connection we may have inherited from a redirect.
|
||||
// We're using a cached response. Recycle a connection we may have inherited from a redirect.
|
||||
if (connection != null) {
|
||||
disconnect();
|
||||
client.getConnectionPool().recycle(connection);
|
||||
connection = null;
|
||||
}
|
||||
|
||||
// No need for the network! Promote the cached response immediately.
|
||||
@@ -243,7 +230,7 @@ public class HttpEngine {
|
||||
|
||||
if (!connection.isConnected()) {
|
||||
connection.connect(client.getConnectTimeout(), client.getReadTimeout(), getTunnelConfig());
|
||||
client.getConnectionPool().maybeShare(connection);
|
||||
if (connection.isSpdy()) client.getConnectionPool().share(connection);
|
||||
client.getRoutesDatabase().connected(connection.getRoute());
|
||||
} else if (!connection.isSpdy()) {
|
||||
connection.updateReadTimeout(client.getReadTimeout());
|
||||
@@ -252,15 +239,6 @@ public class HttpEngine {
|
||||
route = connection.getRoute();
|
||||
}
|
||||
|
||||
/**
|
||||
* Recycle the connection to the origin server. It is an error to call this
|
||||
* with a request in flight.
|
||||
*/
|
||||
private void disconnect() {
|
||||
client.getConnectionPool().recycle(connection);
|
||||
connection = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called immediately before the transport transmits HTTP request headers.
|
||||
* This is used to observe the sent time should the request be cached.
|
||||
@@ -324,10 +302,10 @@ public class HttpEngine {
|
||||
return null;
|
||||
}
|
||||
|
||||
release(true);
|
||||
Connection connection = close();
|
||||
|
||||
// For failure recovery, use the same route selector with a new connection.
|
||||
return new HttpEngine(client, request, bufferRequestBody, null, routeSelector,
|
||||
return new HttpEngine(client, request, bufferRequestBody, connection, routeSelector,
|
||||
(RetryableOutputStream) requestBodyOut);
|
||||
}
|
||||
|
||||
@@ -363,42 +341,42 @@ public class HttpEngine {
|
||||
}
|
||||
|
||||
/**
|
||||
* Cause the socket connection to be released to the connection pool when
|
||||
* it is no longer needed. If it is already unneeded, it will be pooled
|
||||
* immediately. Otherwise the connection is held so that redirects can be
|
||||
* handled by the same connection.
|
||||
* Configure the socket connection to be either pooled or closed when it is
|
||||
* either exhausted or closed. If it is unneeded when this is called, it will
|
||||
* be released immediately.
|
||||
*/
|
||||
public final void automaticallyReleaseConnectionToPool() {
|
||||
automaticallyReleaseConnectionToPool = true;
|
||||
if (connection != null && connectionReleased) {
|
||||
disconnect();
|
||||
public final void releaseConnection() throws IOException {
|
||||
if (transport != null) {
|
||||
transport.releaseConnectionOnIdle();
|
||||
}
|
||||
connection = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Releases this engine so that its resources may be either reused or
|
||||
* closed. Also call {@link #automaticallyReleaseConnectionToPool} unless
|
||||
* the connection will be used to follow a redirect.
|
||||
* Release any resources held by this engine. If a connection is still held by
|
||||
* this engine, it is returned.
|
||||
*/
|
||||
public final void release(boolean streamCanceled) {
|
||||
// If the response body comes from the cache, close it.
|
||||
if (validatingResponse != null
|
||||
&& validatingResponse.body() != null
|
||||
&& responseBodyIn == validatingResponse.body().byteStream()) {
|
||||
closeQuietly(responseBodyIn);
|
||||
public final Connection close() {
|
||||
// If this engine never achieved a response body, its connection cannot be reused.
|
||||
if (responseBodyIn == null) {
|
||||
closeQuietly(connection);
|
||||
connection = null;
|
||||
return null;
|
||||
}
|
||||
|
||||
if (connection != null && !connectionReleased) {
|
||||
connectionReleased = true;
|
||||
// Close the response body. This will recycle the connection if it is eligible.
|
||||
closeQuietly(responseBodyIn);
|
||||
|
||||
if (transport == null
|
||||
|| !transport.makeReusable(streamCanceled, requestBodyOut, responseTransferIn)) {
|
||||
closeQuietly(connection);
|
||||
connection = null;
|
||||
} else if (automaticallyReleaseConnectionToPool) {
|
||||
disconnect();
|
||||
}
|
||||
// Close the connection if it cannot be reused.
|
||||
if (transport != null && !transport.canReuseConnection()) {
|
||||
closeQuietly(connection);
|
||||
connection = null;
|
||||
return null;
|
||||
}
|
||||
|
||||
Connection result = connection;
|
||||
connection = null;
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -553,7 +531,7 @@ public class HttpEngine {
|
||||
if (responseSource == ResponseSource.CONDITIONAL_CACHE) {
|
||||
if (validatingResponse.validate(response)) {
|
||||
transport.emptyTransferStream();
|
||||
release(false);
|
||||
releaseConnection();
|
||||
response = combine(validatingResponse, response);
|
||||
|
||||
// Update the cache after combining headers but before stripping the
|
||||
|
||||
@@ -18,7 +18,6 @@ package com.squareup.okhttp.internal.http;
|
||||
|
||||
import com.squareup.okhttp.Request;
|
||||
import com.squareup.okhttp.Response;
|
||||
import com.squareup.okhttp.internal.AbstractOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
@@ -104,46 +103,39 @@ public final class HttpTransport implements Transport {
|
||||
return httpConnection.readResponse();
|
||||
}
|
||||
|
||||
public boolean makeReusable(boolean streamCanceled, OutputStream requestBodyOut,
|
||||
InputStream responseBodyIn) {
|
||||
if (streamCanceled) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// We cannot reuse sockets that have incomplete output.
|
||||
if (requestBodyOut != null && !((AbstractOutputStream) requestBodyOut).isClosed()) {
|
||||
return false;
|
||||
@Override public void releaseConnectionOnIdle() throws IOException {
|
||||
if (canReuseConnection()) {
|
||||
httpConnection.poolOnIdle();
|
||||
} else {
|
||||
httpConnection.closeOnIdle();
|
||||
}
|
||||
}
|
||||
|
||||
@Override public boolean canReuseConnection() {
|
||||
// If the request specified that the connection shouldn't be reused, don't reuse it.
|
||||
if ("close".equalsIgnoreCase(httpEngine.getRequest().header("Connection"))) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// If the response specified that the connection shouldn't be reused, don't reuse it.
|
||||
if (httpEngine.getResponse() != null
|
||||
&& "close".equalsIgnoreCase(httpEngine.getResponse().header("Connection"))) {
|
||||
if ("close".equalsIgnoreCase(httpEngine.getResponse().header("Connection"))) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (responseBodyIn instanceof HttpConnection.UnknownLengthHttpInputStream) {
|
||||
if (httpConnection.isClosed()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (responseBodyIn != null) {
|
||||
return httpConnection.discard(httpEngine, responseBodyIn);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override public void emptyTransferStream() {
|
||||
@Override public void emptyTransferStream() throws IOException {
|
||||
httpConnection.emptyResponseBody();
|
||||
}
|
||||
|
||||
@Override public InputStream getTransferStream(CacheRequest cacheRequest) throws IOException {
|
||||
if (!httpEngine.hasResponseBody()) {
|
||||
return httpConnection.newFixedLengthInputStream(cacheRequest, httpEngine, 0);
|
||||
return httpConnection.newFixedLengthInputStream(cacheRequest, 0);
|
||||
}
|
||||
|
||||
if ("chunked".equalsIgnoreCase(httpEngine.getResponse().header("Transfer-Encoding"))) {
|
||||
@@ -152,7 +144,7 @@ public final class HttpTransport implements Transport {
|
||||
|
||||
long contentLength = OkHeaders.contentLength(httpEngine.getResponse());
|
||||
if (contentLength != -1) {
|
||||
return httpConnection.newFixedLengthInputStream(cacheRequest, httpEngine, contentLength);
|
||||
return httpConnection.newFixedLengthInputStream(cacheRequest, contentLength);
|
||||
}
|
||||
|
||||
// Wrap the input stream from the connection (rather than just returning
|
||||
|
||||
@@ -24,9 +24,9 @@ import com.squareup.okhttp.Protocol;
|
||||
import com.squareup.okhttp.Request;
|
||||
import com.squareup.okhttp.Response;
|
||||
import com.squareup.okhttp.Route;
|
||||
import com.squareup.okhttp.internal.bytes.ByteString;
|
||||
import com.squareup.okhttp.internal.Platform;
|
||||
import com.squareup.okhttp.internal.Util;
|
||||
import com.squareup.okhttp.internal.bytes.ByteString;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
@@ -101,15 +101,7 @@ public class HttpURLConnectionImpl extends HttpURLConnection {
|
||||
@Override public final void disconnect() {
|
||||
// Calling disconnect() before a connection exists should have no effect.
|
||||
if (httpEngine != null) {
|
||||
// We close the response body here instead of in
|
||||
// HttpEngine.release because that is called when input
|
||||
// has been completely read from the underlying socket.
|
||||
// However the response body can be a GZIPInputStream that
|
||||
// still has unread data.
|
||||
if (httpEngine.hasResponse()) {
|
||||
Util.closeQuietly(httpEngine.getResponseBody());
|
||||
}
|
||||
httpEngine.release(true);
|
||||
httpEngine.close();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -327,7 +319,7 @@ public class HttpURLConnectionImpl extends HttpURLConnection {
|
||||
|
||||
Retry retry = processResponseHeaders();
|
||||
if (retry == Retry.NONE) {
|
||||
httpEngine.automaticallyReleaseConnectionToPool();
|
||||
httpEngine.releaseConnection();
|
||||
return httpEngine;
|
||||
}
|
||||
|
||||
@@ -353,12 +345,11 @@ public class HttpURLConnectionImpl extends HttpURLConnection {
|
||||
}
|
||||
|
||||
if (retry == Retry.DIFFERENT_CONNECTION) {
|
||||
httpEngine.automaticallyReleaseConnectionToPool();
|
||||
httpEngine.releaseConnection();
|
||||
}
|
||||
|
||||
httpEngine.release(false);
|
||||
httpEngine = newHttpEngine(retryMethod, httpEngine.getConnection(),
|
||||
(RetryableOutputStream) requestBody);
|
||||
Connection connection = httpEngine.close();
|
||||
httpEngine = newHttpEngine(retryMethod, connection, (RetryableOutputStream) requestBody);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -117,7 +117,7 @@ public final class RouteSelector {
|
||||
if (!hasNextPostponed()) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
return new Connection(nextPostponed());
|
||||
return new Connection(pool, nextPostponed());
|
||||
}
|
||||
lastProxy = nextProxy();
|
||||
resetNextInetSocketAddress(lastProxy);
|
||||
@@ -135,7 +135,7 @@ public final class RouteSelector {
|
||||
return next(method);
|
||||
}
|
||||
|
||||
return new Connection(route);
|
||||
return new Connection(pool, route);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -195,9 +195,11 @@ public final class SpdyTransport implements Transport {
|
||||
return new SpdyInputStream(stream, cacheRequest, httpEngine);
|
||||
}
|
||||
|
||||
@Override public boolean makeReusable(boolean streamCanceled, OutputStream requestBodyOut,
|
||||
InputStream responseBodyIn) {
|
||||
return true; // SPDY sockets are always reusable.
|
||||
@Override public void releaseConnectionOnIdle() {
|
||||
}
|
||||
|
||||
@Override public boolean canReuseConnection() {
|
||||
return true; // TODO: spdyConnection.isClosed() ?
|
||||
}
|
||||
|
||||
/** When true, this header should not be emitted or consumed. */
|
||||
@@ -242,15 +244,12 @@ public final class SpdyTransport implements Transport {
|
||||
*/
|
||||
abstract static class AbstractHttpInputStream extends InputStream {
|
||||
protected final InputStream in;
|
||||
protected final HttpEngine httpEngine;
|
||||
private final CacheRequest cacheRequest;
|
||||
protected final OutputStream cacheBody;
|
||||
protected boolean closed;
|
||||
|
||||
AbstractHttpInputStream(InputStream in, HttpEngine httpEngine, CacheRequest cacheRequest)
|
||||
throws IOException {
|
||||
AbstractHttpInputStream(InputStream in, CacheRequest cacheRequest) throws IOException {
|
||||
this.in = in;
|
||||
this.httpEngine = httpEngine;
|
||||
|
||||
OutputStream cacheBody = cacheRequest != null ? cacheRequest.getBody() : null;
|
||||
|
||||
@@ -272,9 +271,7 @@ public final class SpdyTransport implements Transport {
|
||||
}
|
||||
|
||||
protected final void checkNotClosed() throws IOException {
|
||||
if (closed) {
|
||||
throw new IOException("stream closed");
|
||||
}
|
||||
if (closed) throw new IOException("stream closed");
|
||||
}
|
||||
|
||||
protected final void cacheWrite(byte[] buffer, int offset, int count) throws IOException {
|
||||
@@ -291,7 +288,6 @@ public final class SpdyTransport implements Transport {
|
||||
if (cacheRequest != null) {
|
||||
cacheBody.close();
|
||||
}
|
||||
httpEngine.release(false);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -310,7 +306,6 @@ public final class SpdyTransport implements Transport {
|
||||
if (cacheRequest != null) {
|
||||
cacheRequest.abort();
|
||||
}
|
||||
httpEngine.release(true);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -321,7 +316,7 @@ public final class SpdyTransport implements Transport {
|
||||
|
||||
SpdyInputStream(SpdyStream stream, CacheRequest cacheRequest, HttpEngine httpEngine)
|
||||
throws IOException {
|
||||
super(stream.getInputStream(), httpEngine, cacheRequest);
|
||||
super(stream.getInputStream(), cacheRequest);
|
||||
this.stream = stream;
|
||||
}
|
||||
|
||||
|
||||
@@ -65,12 +65,20 @@ interface Transport {
|
||||
Response.Builder readResponseHeaders() throws IOException;
|
||||
|
||||
/** Notify the transport that no response body will be read. */
|
||||
void emptyTransferStream();
|
||||
void emptyTransferStream() throws IOException;
|
||||
|
||||
// TODO: make this the content stream?
|
||||
InputStream getTransferStream(CacheRequest cacheRequest) throws IOException;
|
||||
|
||||
/** Returns true if the underlying connection can be recycled. */
|
||||
boolean makeReusable(boolean streamCanceled, OutputStream requestBodyOut,
|
||||
InputStream responseBodyIn);
|
||||
/**
|
||||
* Configures the response body to pool or close the socket connection when
|
||||
* the response body is closed.
|
||||
*/
|
||||
void releaseConnectionOnIdle() throws IOException;
|
||||
|
||||
/**
|
||||
* Returns true if the socket connection held by this transport can be reused
|
||||
* for a follow-up exchange.
|
||||
*/
|
||||
boolean canReuseConnection();
|
||||
}
|
||||
|
||||
@@ -35,6 +35,7 @@ import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public final class ConnectionPoolTest {
|
||||
private static final int KEEP_ALIVE_DURATION_MS = 5000;
|
||||
@@ -54,7 +55,6 @@ public final class ConnectionPoolTest {
|
||||
private Connection httpD;
|
||||
private Connection httpE;
|
||||
private Connection spdyA;
|
||||
private Connection spdyB;
|
||||
|
||||
@Before public void setUp() throws Exception {
|
||||
spdyServer.useHttps(sslContext.getSocketFactory(), false);
|
||||
@@ -74,20 +74,18 @@ public final class ConnectionPoolTest {
|
||||
|
||||
Route httpRoute = new Route(httpAddress, Proxy.NO_PROXY, httpSocketAddress, true);
|
||||
Route spdyRoute = new Route(spdyAddress, Proxy.NO_PROXY, spdySocketAddress, true);
|
||||
httpA = new Connection(httpRoute);
|
||||
httpA = new Connection(null, httpRoute);
|
||||
httpA.connect(200, 200, null);
|
||||
httpB = new Connection(httpRoute);
|
||||
httpB = new Connection(null, httpRoute);
|
||||
httpB.connect(200, 200, null);
|
||||
httpC = new Connection(httpRoute);
|
||||
httpC = new Connection(null, httpRoute);
|
||||
httpC.connect(200, 200, null);
|
||||
httpD = new Connection(httpRoute);
|
||||
httpD = new Connection(null, httpRoute);
|
||||
httpD.connect(200, 200, null);
|
||||
httpE = new Connection(httpRoute);
|
||||
httpE = new Connection(null, httpRoute);
|
||||
httpE.connect(200, 200, null);
|
||||
spdyA = new Connection(spdyRoute);
|
||||
spdyA.connect(200, 200, null);
|
||||
spdyB = new Connection(spdyRoute);
|
||||
spdyB.connect(200, 200, null);
|
||||
spdyA = new Connection(null, spdyRoute);
|
||||
spdyA.connect(20000, 20000, null);
|
||||
}
|
||||
|
||||
@After public void tearDown() throws Exception {
|
||||
@@ -100,7 +98,6 @@ public final class ConnectionPoolTest {
|
||||
Util.closeQuietly(httpD);
|
||||
Util.closeQuietly(httpE);
|
||||
Util.closeQuietly(spdyA);
|
||||
Util.closeQuietly(spdyB);
|
||||
}
|
||||
|
||||
@Test public void poolSingleHttpConnection() throws IOException {
|
||||
@@ -108,7 +105,8 @@ public final class ConnectionPoolTest {
|
||||
Connection connection = pool.get(httpAddress);
|
||||
assertNull(connection);
|
||||
|
||||
connection = new Connection(new Route(httpAddress, Proxy.NO_PROXY, httpSocketAddress, true));
|
||||
connection = new Connection(
|
||||
null, new Route(httpAddress, Proxy.NO_PROXY, httpSocketAddress, true));
|
||||
connection.connect(200, 200, null);
|
||||
assertEquals(0, pool.getConnectionCount());
|
||||
pool.recycle(connection);
|
||||
@@ -134,7 +132,7 @@ public final class ConnectionPoolTest {
|
||||
|
||||
@Test public void getSpdyConnection() throws Exception {
|
||||
ConnectionPool pool = new ConnectionPool(2, KEEP_ALIVE_DURATION_MS);
|
||||
pool.maybeShare(spdyA);
|
||||
pool.share(spdyA);
|
||||
assertSame(spdyA, pool.get(spdyAddress));
|
||||
assertPooled(pool, spdyA);
|
||||
}
|
||||
@@ -189,7 +187,7 @@ public final class ConnectionPoolTest {
|
||||
|
||||
@Test public void gettingSpdyConnectionPromotesItToFrontOfQueue() throws Exception {
|
||||
ConnectionPool pool = new ConnectionPool(2, KEEP_ALIVE_DURATION_MS);
|
||||
pool.maybeShare(spdyA);
|
||||
pool.share(spdyA);
|
||||
pool.recycle(httpA);
|
||||
assertPooled(pool, httpA, spdyA);
|
||||
assertSame(spdyA, pool.get(spdyAddress));
|
||||
@@ -210,9 +208,13 @@ public final class ConnectionPoolTest {
|
||||
assertTrue(httpA.getSocket().isClosed());
|
||||
}
|
||||
|
||||
@Test public void shareHttpConnectionDoesNothing() throws Exception {
|
||||
@Test public void shareHttpConnectionFails() throws Exception {
|
||||
ConnectionPool pool = new ConnectionPool(2, KEEP_ALIVE_DURATION_MS);
|
||||
pool.maybeShare(httpA);
|
||||
try {
|
||||
pool.share(httpA);
|
||||
fail();
|
||||
} catch (IllegalArgumentException expected) {
|
||||
}
|
||||
assertPooled(pool);
|
||||
}
|
||||
|
||||
@@ -224,7 +226,7 @@ public final class ConnectionPoolTest {
|
||||
|
||||
@Test public void validateIdleSpdyConnectionTimeout() throws Exception {
|
||||
ConnectionPool pool = new ConnectionPool(2, KEEP_ALIVE_DURATION_MS);
|
||||
pool.maybeShare(spdyA);
|
||||
pool.share(spdyA);
|
||||
Thread.sleep((int) (KEEP_ALIVE_DURATION_MS * 0.7));
|
||||
assertNull(pool.get(httpAddress));
|
||||
assertPooled(pool, spdyA); // Connection should still be in the pool.
|
||||
@@ -270,7 +272,7 @@ public final class ConnectionPoolTest {
|
||||
assertEquals(0, pool.getSpdyConnectionCount());
|
||||
|
||||
// spdy A should be added and http B should be removed.
|
||||
pool.maybeShare(spdyA);
|
||||
pool.share(spdyA);
|
||||
Thread.sleep(50);
|
||||
assertEquals(2, pool.getConnectionCount());
|
||||
assertEquals(1, pool.getHttpConnectionCount());
|
||||
@@ -299,13 +301,6 @@ public final class ConnectionPoolTest {
|
||||
assertEquals(1, pool.getHttpConnectionCount());
|
||||
assertEquals(1, pool.getSpdyConnectionCount());
|
||||
|
||||
// Nothing should change.
|
||||
pool.maybeShare(spdyB);
|
||||
Thread.sleep(50);
|
||||
assertEquals(2, pool.getConnectionCount());
|
||||
assertEquals(1, pool.getHttpConnectionCount());
|
||||
assertEquals(1, pool.getSpdyConnectionCount());
|
||||
|
||||
// An http connection should be removed from the pool.
|
||||
recycledHttpConnection = pool.get(httpAddress);
|
||||
assertNotNull(recycledHttpConnection);
|
||||
@@ -314,13 +309,6 @@ public final class ConnectionPoolTest {
|
||||
assertEquals(0, pool.getHttpConnectionCount());
|
||||
assertEquals(1, pool.getSpdyConnectionCount());
|
||||
|
||||
// Shouldn't change numbers because spdyConnections A and B user the same server address.
|
||||
pool.maybeShare(spdyB);
|
||||
Thread.sleep(50);
|
||||
assertEquals(1, pool.getConnectionCount());
|
||||
assertEquals(0, pool.getHttpConnectionCount());
|
||||
assertEquals(1, pool.getSpdyConnectionCount());
|
||||
|
||||
// spdy A will be returned and kept in the pool. Pool shouldn't change.
|
||||
sharedSpdyConnection = pool.get(spdyAddress);
|
||||
assertEquals(spdyA, sharedSpdyConnection);
|
||||
@@ -350,7 +338,7 @@ public final class ConnectionPoolTest {
|
||||
// Add 3 connections to the pool.
|
||||
pool.recycle(httpA);
|
||||
pool.recycle(httpB);
|
||||
pool.maybeShare(spdyA);
|
||||
pool.share(spdyA);
|
||||
assertEquals(3, pool.getConnectionCount());
|
||||
assertEquals(2, pool.getHttpConnectionCount());
|
||||
assertEquals(1, pool.getSpdyConnectionCount());
|
||||
@@ -383,7 +371,7 @@ public final class ConnectionPoolTest {
|
||||
pool.recycle(httpA);
|
||||
Util.closeQuietly(httpA); // Include a closed connection in the pool.
|
||||
pool.recycle(httpB);
|
||||
pool.maybeShare(spdyA);
|
||||
pool.share(spdyA);
|
||||
int connectionCount = pool.getConnectionCount();
|
||||
assertTrue(connectionCount == 2 || connectionCount == 3);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user