From dac5aed3d5f9307b9d1ab0a8c7840b232d053ef3 Mon Sep 17 00:00:00 2001 From: jwilson Date: Sat, 15 Feb 2014 18:27:19 -0500 Subject: [PATCH] Move connection pooling logic. Previously we had this ugly, awkward release() method that attempted to manage connection pooling, discarding streams for caching, and closing broken streams. Move connection reuse to HttpConnection, with policy informed by HttpEngine. It specifies what to do when the connection becomes idle: pool, close or hold. The connection does what it's told. --- .../java/com/squareup/okhttp/Connection.java | 11 +- .../com/squareup/okhttp/ConnectionPool.java | 7 +- .../main/java/com/squareup/okhttp/Job.java | 9 +- .../okhttp/internal/http/HttpConnection.java | 115 ++++++++++++------ .../okhttp/internal/http/HttpEngine.java | 88 +++++--------- .../okhttp/internal/http/HttpTransport.java | 32 ++--- .../internal/http/HttpURLConnectionImpl.java | 21 +--- .../okhttp/internal/http/RouteSelector.java | 4 +- .../okhttp/internal/http/SpdyTransport.java | 21 ++-- .../okhttp/internal/http/Transport.java | 16 ++- .../squareup/okhttp/ConnectionPoolTest.java | 56 ++++----- 11 files changed, 182 insertions(+), 198 deletions(-) diff --git a/okhttp/src/main/java/com/squareup/okhttp/Connection.java b/okhttp/src/main/java/com/squareup/okhttp/Connection.java index 32274f629..11fe7e5aa 100644 --- a/okhttp/src/main/java/com/squareup/okhttp/Connection.java +++ b/okhttp/src/main/java/com/squareup/okhttp/Connection.java @@ -65,7 +65,7 @@ import static java.net.HttpURLConnection.HTTP_PROXY_AUTH; * should the attempt fail. */ public final class Connection implements Closeable { - + private final ConnectionPool pool; private final Route route; private Socket socket; @@ -78,7 +78,8 @@ public final class Connection implements Closeable { private long idleStartTimeNs; private Handshake handshake; - public Connection(Route route) { + public Connection(ConnectionPool pool, Route route) { + this.pool = pool; this.route = route; } @@ -96,7 +97,7 @@ public final class Connection implements Closeable { upgradeToTls(tunnelRequest); } else { streamWrapper(); - httpConnection = new HttpConnection(in, out); + httpConnection = new HttpConnection(pool, this, in, out); } connected = true; } @@ -164,7 +165,7 @@ public final class Connection implements Closeable { .protocol(selectedProtocol).build(); spdyConnection.sendConnectionHeader(); } else { - httpConnection = new HttpConnection(in, out); + httpConnection = new HttpConnection(pool, this, in, out); } } @@ -306,7 +307,7 @@ public final class Connection implements Closeable { * retried if the proxy requires authorization. */ private void makeTunnel(TunnelRequest tunnelRequest) throws IOException { - HttpConnection tunnelConnection = new HttpConnection(in, out); + HttpConnection tunnelConnection = new HttpConnection(pool, this, in, out); Request request = tunnelRequest.getRequest(); String requestLine = tunnelRequest.requestLine(); while (true) { diff --git a/okhttp/src/main/java/com/squareup/okhttp/ConnectionPool.java b/okhttp/src/main/java/com/squareup/okhttp/ConnectionPool.java index 240cf83be..0b922fc79 100644 --- a/okhttp/src/main/java/com/squareup/okhttp/ConnectionPool.java +++ b/okhttp/src/main/java/com/squareup/okhttp/ConnectionPool.java @@ -244,12 +244,9 @@ public class ConnectionPool { * Shares the SPDY connection with the pool. Callers to this method may * continue to use {@code connection}. */ - public void maybeShare(Connection connection) { + public void share(Connection connection) { + if (!connection.isSpdy()) throw new IllegalArgumentException(); executorService.execute(connectionsCleanupRunnable); - if (!connection.isSpdy()) { - // Only SPDY connections are sharable. - return; - } if (connection.isAlive()) { synchronized (this) { connections.addFirst(connection); diff --git a/okhttp/src/main/java/com/squareup/okhttp/Job.java b/okhttp/src/main/java/com/squareup/okhttp/Job.java index 1a860dfdd..bd87e6ea7 100644 --- a/okhttp/src/main/java/com/squareup/okhttp/Job.java +++ b/okhttp/src/main/java/com/squareup/okhttp/Job.java @@ -80,7 +80,7 @@ final class Job extends NamedRunnable { .exception(e) .build()); } finally { - engine.release(true); // Release the connection if it isn't already released. + engine.close(); // Close the connection if it isn't already. dispatcher.finished(this); } } @@ -142,7 +142,7 @@ final class Job extends NamedRunnable { Request redirect = processResponse(engine, response); if (redirect == null) { - engine.automaticallyReleaseConnectionToPool(); + engine.releaseConnection(); return response.newBuilder() .body(new RealResponseBody(response, engine.getResponseBody())) .redirectedBy(redirectedBy) @@ -150,11 +150,10 @@ final class Job extends NamedRunnable { } if (!sameConnection(request, redirect)) { - engine.automaticallyReleaseConnectionToPool(); + engine.releaseConnection(); } - engine.release(false); - Connection connection = engine.getConnection(); + Connection connection = engine.close(); redirectedBy = response.newBuilder().redirectedBy(redirectedBy).build(); // Chained. request = redirect; engine = new HttpEngine(client, request, false, connection, null, null); diff --git a/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpConnection.java b/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpConnection.java index 173716a4c..cce881b8f 100644 --- a/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpConnection.java +++ b/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpConnection.java @@ -17,6 +17,7 @@ package com.squareup.okhttp.internal.http; import com.squareup.okhttp.Connection; +import com.squareup.okhttp.ConnectionPool; import com.squareup.okhttp.Headers; import com.squareup.okhttp.Protocol; import com.squareup.okhttp.Response; @@ -61,16 +62,59 @@ public final class HttpConnection { private static final int STATE_READING_RESPONSE_BODY = 5; private static final int STATE_CLOSED = 6; + private static final int ON_IDLE_HOLD = 0; + private static final int ON_IDLE_POOL = 1; + private static final int ON_IDLE_CLOSE = 2; + + private final ConnectionPool pool; + private final Connection connection; private final InputStream in; private final OutputStream out; private int state = STATE_IDLE; + private int onIdle = ON_IDLE_HOLD; - public HttpConnection(InputStream in, OutputStream out) { + public HttpConnection(ConnectionPool pool, Connection connection, InputStream in, + OutputStream out) { + this.pool = pool; + this.connection = connection; this.in = in; this.out = out; } + /** + * Configure this connection to put itself back into the connection pool when + * the HTTP response body is exhausted. + */ + public void poolOnIdle() { + onIdle = ON_IDLE_POOL; + + // If we're already idle, go to the pool immediately. + if (state == STATE_IDLE) { + onIdle = ON_IDLE_HOLD; // Set the on idle policy back to the default. + pool.recycle(connection); + } + } + + /** + * Configure this connection to close itself when the HTTP response body is + * exhausted. + */ + public void closeOnIdle() throws IOException { + onIdle = ON_IDLE_CLOSE; + + // If we're already idle, close immediately. + if (state == STATE_IDLE) { + state = STATE_CLOSED; + connection.close(); + } + } + + /** Returns true if this connection is closed. */ + public boolean isClosed() { + return state == STATE_CLOSED; + } + public void flush() throws IOException { out.flush(); } @@ -126,11 +170,8 @@ public final class HttpConnection { * cannot be cached unless it is consumed completely) or to enable connection * reuse. */ - private static boolean discardStream(HttpEngine httpEngine, InputStream responseBodyIn) { - Connection connection = httpEngine.getConnection(); - if (connection == null) return false; + public boolean discard(InputStream responseBodyIn) { Socket socket = connection.getSocket(); - if (socket == null) return false; try { int socketTimeout = socket.getSoTimeout(); socket.setSoTimeout(Transport.DISCARD_STREAM_TIMEOUT_MILLIS); @@ -163,38 +204,33 @@ public final class HttpConnection { requestBody.writeToSocket(out); } - public InputStream newFixedLengthInputStream( - CacheRequest cacheRequest, HttpEngine httpEngine, long length) throws IOException { + public InputStream newFixedLengthInputStream(CacheRequest cacheRequest, long length) + throws IOException { if (state != STATE_OPEN_RESPONSE_BODY) throw new IllegalStateException("state: " + state); state = STATE_READING_RESPONSE_BODY; - return new FixedLengthInputStream(in, cacheRequest, httpEngine, length); + return new FixedLengthInputStream(cacheRequest, length); } /** * Call this to advance past a response body for HTTP responses that do not * have a response body. */ - public void emptyResponseBody() { - if (state != STATE_OPEN_RESPONSE_BODY) throw new IllegalStateException("state: " + state); - state = STATE_IDLE; + public void emptyResponseBody() throws IOException { + newFixedLengthInputStream(null, 0L); // Transition to STATE_IDLE. } public InputStream newChunkedInputStream( CacheRequest cacheRequest, HttpEngine httpEngine) throws IOException { if (state != STATE_OPEN_RESPONSE_BODY) throw new IllegalStateException("state: " + state); state = STATE_READING_RESPONSE_BODY; - return new ChunkedInputStream(in, cacheRequest, httpEngine); + return new ChunkedInputStream(cacheRequest, httpEngine); } public InputStream newUnknownLengthInputStream(CacheRequest cacheRequest, HttpEngine httpEngine) throws IOException { if (state != STATE_OPEN_RESPONSE_BODY) throw new IllegalStateException("state: " + state); state = STATE_READING_RESPONSE_BODY; - return new UnknownLengthHttpInputStream(in, cacheRequest, httpEngine); - } - - public boolean discard(HttpEngine httpEngine, InputStream responseBodyIn) { - return discardStream(httpEngine, responseBodyIn); + return new UnknownLengthHttpInputStream(cacheRequest); } /** An HTTP body with a fixed length known in advance. */ @@ -347,17 +383,11 @@ public final class HttpConnection { } private class AbstractHttpInputStream extends InputStream { - protected final InputStream in; - protected final HttpEngine httpEngine; private final CacheRequest cacheRequest; protected final OutputStream cacheBody; protected boolean closed; - AbstractHttpInputStream(InputStream in, HttpEngine httpEngine, CacheRequest cacheRequest) - throws IOException { - this.in = in; - this.httpEngine = httpEngine; - + AbstractHttpInputStream(CacheRequest cacheRequest) throws IOException { OutputStream cacheBody = cacheRequest != null ? cacheRequest.getBody() : null; // Some apps return a null body; for compatibility we treat that like a null cache request. @@ -378,9 +408,7 @@ public final class HttpConnection { } protected final void checkNotClosed() throws IOException { - if (closed) { - throw new IOException("stream closed"); - } + if (closed) throw new IOException("stream closed"); } protected final void cacheWrite(byte[] buffer, int offset, int count) throws IOException { @@ -395,11 +423,19 @@ public final class HttpConnection { */ protected final void endOfInput() throws IOException { if (state != STATE_READING_RESPONSE_BODY) throw new IllegalStateException("state: " + state); + if (cacheRequest != null) { cacheBody.close(); } - httpEngine.release(false); + state = STATE_IDLE; + if (onIdle == ON_IDLE_POOL) { + onIdle = ON_IDLE_HOLD; // Set the on idle policy back to the default. + pool.recycle(connection); + } else if (onIdle == ON_IDLE_CLOSE) { + state = STATE_CLOSED; + connection.close(); + } } /** @@ -418,7 +454,7 @@ public final class HttpConnection { if (cacheRequest != null) { cacheRequest.abort(); } - httpEngine.release(true); + Util.closeQuietly(connection); state = STATE_CLOSED; } } @@ -427,9 +463,8 @@ public final class HttpConnection { private class FixedLengthInputStream extends AbstractHttpInputStream { private long bytesRemaining; - public FixedLengthInputStream(InputStream is, CacheRequest cacheRequest, HttpEngine httpEngine, - long length) throws IOException { - super(is, httpEngine, cacheRequest); + public FixedLengthInputStream(CacheRequest cacheRequest, long length) throws IOException { + super(cacheRequest); bytesRemaining = length; if (bytesRemaining == 0) { endOfInput(); @@ -464,7 +499,7 @@ public final class HttpConnection { if (closed) { return; } - if (bytesRemaining != 0 && !discardStream(httpEngine, this)) { + if (bytesRemaining != 0 && !discard(this)) { unexpectedEndOfInput(); } closed = true; @@ -476,10 +511,11 @@ public final class HttpConnection { private static final int NO_CHUNK_YET = -1; private int bytesRemainingInChunk = NO_CHUNK_YET; private boolean hasMoreChunks = true; + private final HttpEngine httpEngine; - ChunkedInputStream(InputStream is, CacheRequest cacheRequest, HttpEngine httpEngine) - throws IOException { - super(is, httpEngine, cacheRequest); + ChunkedInputStream(CacheRequest cacheRequest, HttpEngine httpEngine) throws IOException { + super(cacheRequest); + this.httpEngine = httpEngine; } @Override public int read(byte[] buffer, int offset, int count) throws IOException { @@ -541,7 +577,7 @@ public final class HttpConnection { if (closed) { return; } - if (hasMoreChunks && !discardStream(httpEngine, this)) { + if (hasMoreChunks && !discard(this)) { unexpectedEndOfInput(); } closed = true; @@ -552,9 +588,8 @@ public final class HttpConnection { class UnknownLengthHttpInputStream extends AbstractHttpInputStream { private boolean inputExhausted; - UnknownLengthHttpInputStream(InputStream in, CacheRequest cacheRequest, HttpEngine httpEngine) - throws IOException { - super(in, httpEngine, cacheRequest); + UnknownLengthHttpInputStream(CacheRequest cacheRequest) throws IOException { + super(cacheRequest); } @Override public int read(byte[] buffer, int offset, int count) throws IOException { diff --git a/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpEngine.java b/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpEngine.java index 4b36e0c62..1fdcf7ebf 100644 --- a/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpEngine.java +++ b/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpEngine.java @@ -68,11 +68,6 @@ import static java.net.HttpURLConnection.HTTP_NO_CONTENT; * *

The request and response may be served by the HTTP response cache, by the * network, or by both in the event of a conditional GET. - * - *

This class may hold a socket connection that needs to be released or - * recycled. By default, this socket connection is held when the last byte of - * the response is consumed. To release the connection when it is no longer - * required, use {@link #automaticallyReleaseConnectionToPool()}. */ public class HttpEngine { final OkHttpClient client; @@ -121,15 +116,6 @@ public class HttpEngine { /** The cache request currently being populated from a network response. */ private CacheRequest cacheRequest; - /** - * True if the socket connection should be released to the connection pool - * when the response has been fully read. - */ - private boolean automaticallyReleaseConnectionToPool; - - /** True if the socket connection is no longer needed by this engine. */ - private boolean connectionReleased; - /** * @param request the HTTP request without a body. The body must be * written via the engine's request body stream. @@ -199,9 +185,10 @@ public class HttpEngine { } } else { - // We're using a cached response. Close the connection we may have inherited from a redirect. + // We're using a cached response. Recycle a connection we may have inherited from a redirect. if (connection != null) { - disconnect(); + client.getConnectionPool().recycle(connection); + connection = null; } // No need for the network! Promote the cached response immediately. @@ -243,7 +230,7 @@ public class HttpEngine { if (!connection.isConnected()) { connection.connect(client.getConnectTimeout(), client.getReadTimeout(), getTunnelConfig()); - client.getConnectionPool().maybeShare(connection); + if (connection.isSpdy()) client.getConnectionPool().share(connection); client.getRoutesDatabase().connected(connection.getRoute()); } else if (!connection.isSpdy()) { connection.updateReadTimeout(client.getReadTimeout()); @@ -252,15 +239,6 @@ public class HttpEngine { route = connection.getRoute(); } - /** - * Recycle the connection to the origin server. It is an error to call this - * with a request in flight. - */ - private void disconnect() { - client.getConnectionPool().recycle(connection); - connection = null; - } - /** * Called immediately before the transport transmits HTTP request headers. * This is used to observe the sent time should the request be cached. @@ -324,10 +302,10 @@ public class HttpEngine { return null; } - release(true); + Connection connection = close(); // For failure recovery, use the same route selector with a new connection. - return new HttpEngine(client, request, bufferRequestBody, null, routeSelector, + return new HttpEngine(client, request, bufferRequestBody, connection, routeSelector, (RetryableOutputStream) requestBodyOut); } @@ -363,42 +341,42 @@ public class HttpEngine { } /** - * Cause the socket connection to be released to the connection pool when - * it is no longer needed. If it is already unneeded, it will be pooled - * immediately. Otherwise the connection is held so that redirects can be - * handled by the same connection. + * Configure the socket connection to be either pooled or closed when it is + * either exhausted or closed. If it is unneeded when this is called, it will + * be released immediately. */ - public final void automaticallyReleaseConnectionToPool() { - automaticallyReleaseConnectionToPool = true; - if (connection != null && connectionReleased) { - disconnect(); + public final void releaseConnection() throws IOException { + if (transport != null) { + transport.releaseConnectionOnIdle(); } + connection = null; } /** - * Releases this engine so that its resources may be either reused or - * closed. Also call {@link #automaticallyReleaseConnectionToPool} unless - * the connection will be used to follow a redirect. + * Release any resources held by this engine. If a connection is still held by + * this engine, it is returned. */ - public final void release(boolean streamCanceled) { - // If the response body comes from the cache, close it. - if (validatingResponse != null - && validatingResponse.body() != null - && responseBodyIn == validatingResponse.body().byteStream()) { - closeQuietly(responseBodyIn); + public final Connection close() { + // If this engine never achieved a response body, its connection cannot be reused. + if (responseBodyIn == null) { + closeQuietly(connection); + connection = null; + return null; } - if (connection != null && !connectionReleased) { - connectionReleased = true; + // Close the response body. This will recycle the connection if it is eligible. + closeQuietly(responseBodyIn); - if (transport == null - || !transport.makeReusable(streamCanceled, requestBodyOut, responseTransferIn)) { - closeQuietly(connection); - connection = null; - } else if (automaticallyReleaseConnectionToPool) { - disconnect(); - } + // Close the connection if it cannot be reused. + if (transport != null && !transport.canReuseConnection()) { + closeQuietly(connection); + connection = null; + return null; } + + Connection result = connection; + connection = null; + return result; } /** @@ -553,7 +531,7 @@ public class HttpEngine { if (responseSource == ResponseSource.CONDITIONAL_CACHE) { if (validatingResponse.validate(response)) { transport.emptyTransferStream(); - release(false); + releaseConnection(); response = combine(validatingResponse, response); // Update the cache after combining headers but before stripping the diff --git a/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpTransport.java b/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpTransport.java index 17a621fc3..5e2592799 100644 --- a/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpTransport.java +++ b/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpTransport.java @@ -18,7 +18,6 @@ package com.squareup.okhttp.internal.http; import com.squareup.okhttp.Request; import com.squareup.okhttp.Response; -import com.squareup.okhttp.internal.AbstractOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -104,46 +103,39 @@ public final class HttpTransport implements Transport { return httpConnection.readResponse(); } - public boolean makeReusable(boolean streamCanceled, OutputStream requestBodyOut, - InputStream responseBodyIn) { - if (streamCanceled) { - return false; - } - - // We cannot reuse sockets that have incomplete output. - if (requestBodyOut != null && !((AbstractOutputStream) requestBodyOut).isClosed()) { - return false; + @Override public void releaseConnectionOnIdle() throws IOException { + if (canReuseConnection()) { + httpConnection.poolOnIdle(); + } else { + httpConnection.closeOnIdle(); } + } + @Override public boolean canReuseConnection() { // If the request specified that the connection shouldn't be reused, don't reuse it. if ("close".equalsIgnoreCase(httpEngine.getRequest().header("Connection"))) { return false; } // If the response specified that the connection shouldn't be reused, don't reuse it. - if (httpEngine.getResponse() != null - && "close".equalsIgnoreCase(httpEngine.getResponse().header("Connection"))) { + if ("close".equalsIgnoreCase(httpEngine.getResponse().header("Connection"))) { return false; } - if (responseBodyIn instanceof HttpConnection.UnknownLengthHttpInputStream) { + if (httpConnection.isClosed()) { return false; } - if (responseBodyIn != null) { - return httpConnection.discard(httpEngine, responseBodyIn); - } - return true; } - @Override public void emptyTransferStream() { + @Override public void emptyTransferStream() throws IOException { httpConnection.emptyResponseBody(); } @Override public InputStream getTransferStream(CacheRequest cacheRequest) throws IOException { if (!httpEngine.hasResponseBody()) { - return httpConnection.newFixedLengthInputStream(cacheRequest, httpEngine, 0); + return httpConnection.newFixedLengthInputStream(cacheRequest, 0); } if ("chunked".equalsIgnoreCase(httpEngine.getResponse().header("Transfer-Encoding"))) { @@ -152,7 +144,7 @@ public final class HttpTransport implements Transport { long contentLength = OkHeaders.contentLength(httpEngine.getResponse()); if (contentLength != -1) { - return httpConnection.newFixedLengthInputStream(cacheRequest, httpEngine, contentLength); + return httpConnection.newFixedLengthInputStream(cacheRequest, contentLength); } // Wrap the input stream from the connection (rather than just returning diff --git a/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpURLConnectionImpl.java b/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpURLConnectionImpl.java index 1876772ab..29a60103c 100644 --- a/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpURLConnectionImpl.java +++ b/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpURLConnectionImpl.java @@ -24,9 +24,9 @@ import com.squareup.okhttp.Protocol; import com.squareup.okhttp.Request; import com.squareup.okhttp.Response; import com.squareup.okhttp.Route; -import com.squareup.okhttp.internal.bytes.ByteString; import com.squareup.okhttp.internal.Platform; import com.squareup.okhttp.internal.Util; +import com.squareup.okhttp.internal.bytes.ByteString; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; @@ -101,15 +101,7 @@ public class HttpURLConnectionImpl extends HttpURLConnection { @Override public final void disconnect() { // Calling disconnect() before a connection exists should have no effect. if (httpEngine != null) { - // We close the response body here instead of in - // HttpEngine.release because that is called when input - // has been completely read from the underlying socket. - // However the response body can be a GZIPInputStream that - // still has unread data. - if (httpEngine.hasResponse()) { - Util.closeQuietly(httpEngine.getResponseBody()); - } - httpEngine.release(true); + httpEngine.close(); } } @@ -327,7 +319,7 @@ public class HttpURLConnectionImpl extends HttpURLConnection { Retry retry = processResponseHeaders(); if (retry == Retry.NONE) { - httpEngine.automaticallyReleaseConnectionToPool(); + httpEngine.releaseConnection(); return httpEngine; } @@ -353,12 +345,11 @@ public class HttpURLConnectionImpl extends HttpURLConnection { } if (retry == Retry.DIFFERENT_CONNECTION) { - httpEngine.automaticallyReleaseConnectionToPool(); + httpEngine.releaseConnection(); } - httpEngine.release(false); - httpEngine = newHttpEngine(retryMethod, httpEngine.getConnection(), - (RetryableOutputStream) requestBody); + Connection connection = httpEngine.close(); + httpEngine = newHttpEngine(retryMethod, connection, (RetryableOutputStream) requestBody); } } diff --git a/okhttp/src/main/java/com/squareup/okhttp/internal/http/RouteSelector.java b/okhttp/src/main/java/com/squareup/okhttp/internal/http/RouteSelector.java index 3f1dc9a2d..09d710031 100644 --- a/okhttp/src/main/java/com/squareup/okhttp/internal/http/RouteSelector.java +++ b/okhttp/src/main/java/com/squareup/okhttp/internal/http/RouteSelector.java @@ -117,7 +117,7 @@ public final class RouteSelector { if (!hasNextPostponed()) { throw new NoSuchElementException(); } - return new Connection(nextPostponed()); + return new Connection(pool, nextPostponed()); } lastProxy = nextProxy(); resetNextInetSocketAddress(lastProxy); @@ -135,7 +135,7 @@ public final class RouteSelector { return next(method); } - return new Connection(route); + return new Connection(pool, route); } /** diff --git a/okhttp/src/main/java/com/squareup/okhttp/internal/http/SpdyTransport.java b/okhttp/src/main/java/com/squareup/okhttp/internal/http/SpdyTransport.java index f388a3d23..1bb29d5e9 100644 --- a/okhttp/src/main/java/com/squareup/okhttp/internal/http/SpdyTransport.java +++ b/okhttp/src/main/java/com/squareup/okhttp/internal/http/SpdyTransport.java @@ -195,9 +195,11 @@ public final class SpdyTransport implements Transport { return new SpdyInputStream(stream, cacheRequest, httpEngine); } - @Override public boolean makeReusable(boolean streamCanceled, OutputStream requestBodyOut, - InputStream responseBodyIn) { - return true; // SPDY sockets are always reusable. + @Override public void releaseConnectionOnIdle() { + } + + @Override public boolean canReuseConnection() { + return true; // TODO: spdyConnection.isClosed() ? } /** When true, this header should not be emitted or consumed. */ @@ -242,15 +244,12 @@ public final class SpdyTransport implements Transport { */ abstract static class AbstractHttpInputStream extends InputStream { protected final InputStream in; - protected final HttpEngine httpEngine; private final CacheRequest cacheRequest; protected final OutputStream cacheBody; protected boolean closed; - AbstractHttpInputStream(InputStream in, HttpEngine httpEngine, CacheRequest cacheRequest) - throws IOException { + AbstractHttpInputStream(InputStream in, CacheRequest cacheRequest) throws IOException { this.in = in; - this.httpEngine = httpEngine; OutputStream cacheBody = cacheRequest != null ? cacheRequest.getBody() : null; @@ -272,9 +271,7 @@ public final class SpdyTransport implements Transport { } protected final void checkNotClosed() throws IOException { - if (closed) { - throw new IOException("stream closed"); - } + if (closed) throw new IOException("stream closed"); } protected final void cacheWrite(byte[] buffer, int offset, int count) throws IOException { @@ -291,7 +288,6 @@ public final class SpdyTransport implements Transport { if (cacheRequest != null) { cacheBody.close(); } - httpEngine.release(false); } /** @@ -310,7 +306,6 @@ public final class SpdyTransport implements Transport { if (cacheRequest != null) { cacheRequest.abort(); } - httpEngine.release(true); } } @@ -321,7 +316,7 @@ public final class SpdyTransport implements Transport { SpdyInputStream(SpdyStream stream, CacheRequest cacheRequest, HttpEngine httpEngine) throws IOException { - super(stream.getInputStream(), httpEngine, cacheRequest); + super(stream.getInputStream(), cacheRequest); this.stream = stream; } diff --git a/okhttp/src/main/java/com/squareup/okhttp/internal/http/Transport.java b/okhttp/src/main/java/com/squareup/okhttp/internal/http/Transport.java index 6ab39b77b..2230cdd3e 100644 --- a/okhttp/src/main/java/com/squareup/okhttp/internal/http/Transport.java +++ b/okhttp/src/main/java/com/squareup/okhttp/internal/http/Transport.java @@ -65,12 +65,20 @@ interface Transport { Response.Builder readResponseHeaders() throws IOException; /** Notify the transport that no response body will be read. */ - void emptyTransferStream(); + void emptyTransferStream() throws IOException; // TODO: make this the content stream? InputStream getTransferStream(CacheRequest cacheRequest) throws IOException; - /** Returns true if the underlying connection can be recycled. */ - boolean makeReusable(boolean streamCanceled, OutputStream requestBodyOut, - InputStream responseBodyIn); + /** + * Configures the response body to pool or close the socket connection when + * the response body is closed. + */ + void releaseConnectionOnIdle() throws IOException; + + /** + * Returns true if the socket connection held by this transport can be reused + * for a follow-up exchange. + */ + boolean canReuseConnection(); } diff --git a/okhttp/src/test/java/com/squareup/okhttp/ConnectionPoolTest.java b/okhttp/src/test/java/com/squareup/okhttp/ConnectionPoolTest.java index 99e7aa3a9..f25e5a2bd 100644 --- a/okhttp/src/test/java/com/squareup/okhttp/ConnectionPoolTest.java +++ b/okhttp/src/test/java/com/squareup/okhttp/ConnectionPoolTest.java @@ -35,6 +35,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public final class ConnectionPoolTest { private static final int KEEP_ALIVE_DURATION_MS = 5000; @@ -54,7 +55,6 @@ public final class ConnectionPoolTest { private Connection httpD; private Connection httpE; private Connection spdyA; - private Connection spdyB; @Before public void setUp() throws Exception { spdyServer.useHttps(sslContext.getSocketFactory(), false); @@ -74,20 +74,18 @@ public final class ConnectionPoolTest { Route httpRoute = new Route(httpAddress, Proxy.NO_PROXY, httpSocketAddress, true); Route spdyRoute = new Route(spdyAddress, Proxy.NO_PROXY, spdySocketAddress, true); - httpA = new Connection(httpRoute); + httpA = new Connection(null, httpRoute); httpA.connect(200, 200, null); - httpB = new Connection(httpRoute); + httpB = new Connection(null, httpRoute); httpB.connect(200, 200, null); - httpC = new Connection(httpRoute); + httpC = new Connection(null, httpRoute); httpC.connect(200, 200, null); - httpD = new Connection(httpRoute); + httpD = new Connection(null, httpRoute); httpD.connect(200, 200, null); - httpE = new Connection(httpRoute); + httpE = new Connection(null, httpRoute); httpE.connect(200, 200, null); - spdyA = new Connection(spdyRoute); - spdyA.connect(200, 200, null); - spdyB = new Connection(spdyRoute); - spdyB.connect(200, 200, null); + spdyA = new Connection(null, spdyRoute); + spdyA.connect(20000, 20000, null); } @After public void tearDown() throws Exception { @@ -100,7 +98,6 @@ public final class ConnectionPoolTest { Util.closeQuietly(httpD); Util.closeQuietly(httpE); Util.closeQuietly(spdyA); - Util.closeQuietly(spdyB); } @Test public void poolSingleHttpConnection() throws IOException { @@ -108,7 +105,8 @@ public final class ConnectionPoolTest { Connection connection = pool.get(httpAddress); assertNull(connection); - connection = new Connection(new Route(httpAddress, Proxy.NO_PROXY, httpSocketAddress, true)); + connection = new Connection( + null, new Route(httpAddress, Proxy.NO_PROXY, httpSocketAddress, true)); connection.connect(200, 200, null); assertEquals(0, pool.getConnectionCount()); pool.recycle(connection); @@ -134,7 +132,7 @@ public final class ConnectionPoolTest { @Test public void getSpdyConnection() throws Exception { ConnectionPool pool = new ConnectionPool(2, KEEP_ALIVE_DURATION_MS); - pool.maybeShare(spdyA); + pool.share(spdyA); assertSame(spdyA, pool.get(spdyAddress)); assertPooled(pool, spdyA); } @@ -189,7 +187,7 @@ public final class ConnectionPoolTest { @Test public void gettingSpdyConnectionPromotesItToFrontOfQueue() throws Exception { ConnectionPool pool = new ConnectionPool(2, KEEP_ALIVE_DURATION_MS); - pool.maybeShare(spdyA); + pool.share(spdyA); pool.recycle(httpA); assertPooled(pool, httpA, spdyA); assertSame(spdyA, pool.get(spdyAddress)); @@ -210,9 +208,13 @@ public final class ConnectionPoolTest { assertTrue(httpA.getSocket().isClosed()); } - @Test public void shareHttpConnectionDoesNothing() throws Exception { + @Test public void shareHttpConnectionFails() throws Exception { ConnectionPool pool = new ConnectionPool(2, KEEP_ALIVE_DURATION_MS); - pool.maybeShare(httpA); + try { + pool.share(httpA); + fail(); + } catch (IllegalArgumentException expected) { + } assertPooled(pool); } @@ -224,7 +226,7 @@ public final class ConnectionPoolTest { @Test public void validateIdleSpdyConnectionTimeout() throws Exception { ConnectionPool pool = new ConnectionPool(2, KEEP_ALIVE_DURATION_MS); - pool.maybeShare(spdyA); + pool.share(spdyA); Thread.sleep((int) (KEEP_ALIVE_DURATION_MS * 0.7)); assertNull(pool.get(httpAddress)); assertPooled(pool, spdyA); // Connection should still be in the pool. @@ -270,7 +272,7 @@ public final class ConnectionPoolTest { assertEquals(0, pool.getSpdyConnectionCount()); // spdy A should be added and http B should be removed. - pool.maybeShare(spdyA); + pool.share(spdyA); Thread.sleep(50); assertEquals(2, pool.getConnectionCount()); assertEquals(1, pool.getHttpConnectionCount()); @@ -299,13 +301,6 @@ public final class ConnectionPoolTest { assertEquals(1, pool.getHttpConnectionCount()); assertEquals(1, pool.getSpdyConnectionCount()); - // Nothing should change. - pool.maybeShare(spdyB); - Thread.sleep(50); - assertEquals(2, pool.getConnectionCount()); - assertEquals(1, pool.getHttpConnectionCount()); - assertEquals(1, pool.getSpdyConnectionCount()); - // An http connection should be removed from the pool. recycledHttpConnection = pool.get(httpAddress); assertNotNull(recycledHttpConnection); @@ -314,13 +309,6 @@ public final class ConnectionPoolTest { assertEquals(0, pool.getHttpConnectionCount()); assertEquals(1, pool.getSpdyConnectionCount()); - // Shouldn't change numbers because spdyConnections A and B user the same server address. - pool.maybeShare(spdyB); - Thread.sleep(50); - assertEquals(1, pool.getConnectionCount()); - assertEquals(0, pool.getHttpConnectionCount()); - assertEquals(1, pool.getSpdyConnectionCount()); - // spdy A will be returned and kept in the pool. Pool shouldn't change. sharedSpdyConnection = pool.get(spdyAddress); assertEquals(spdyA, sharedSpdyConnection); @@ -350,7 +338,7 @@ public final class ConnectionPoolTest { // Add 3 connections to the pool. pool.recycle(httpA); pool.recycle(httpB); - pool.maybeShare(spdyA); + pool.share(spdyA); assertEquals(3, pool.getConnectionCount()); assertEquals(2, pool.getHttpConnectionCount()); assertEquals(1, pool.getSpdyConnectionCount()); @@ -383,7 +371,7 @@ public final class ConnectionPoolTest { pool.recycle(httpA); Util.closeQuietly(httpA); // Include a closed connection in the pool. pool.recycle(httpB); - pool.maybeShare(spdyA); + pool.share(spdyA); int connectionCount = pool.getConnectionCount(); assertTrue(connectionCount == 2 || connectionCount == 3);