From de1b5f244df1e29c457fdd87e91b9919c2336da7 Mon Sep 17 00:00:00 2001 From: jwilson Date: Sat, 20 Jul 2013 10:44:06 -0400 Subject: [PATCH] More aggressive heuristics on pooled connections. These are only turned on for non-GET requests. This replaces the RetryableOutputStream mechanism, which was causing problems for clients whose POSTs weren't idempotent. --- .../java/com/squareup/okhttp/Connection.java | 34 +++++++ .../com/squareup/okhttp/ConnectionPool.java | 4 +- .../okhttp/internal/http/HttpEngine.java | 2 +- .../internal/http/HttpURLConnectionImpl.java | 38 +------- .../okhttp/internal/http/RouteSelector.java | 10 +- .../squareup/okhttp/ConnectionPoolTest.java | 3 +- .../internal/http/RouteSelectorTest.java | 92 +++++++++++-------- .../internal/http/URLConnectionTest.java | 8 +- 8 files changed, 105 insertions(+), 86 deletions(-) diff --git a/okhttp/src/main/java/com/squareup/okhttp/Connection.java b/okhttp/src/main/java/com/squareup/okhttp/Connection.java index 73c4b568a..cfda2818d 100644 --- a/okhttp/src/main/java/com/squareup/okhttp/Connection.java +++ b/okhttp/src/main/java/com/squareup/okhttp/Connection.java @@ -31,6 +31,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.Proxy; import java.net.Socket; +import java.net.SocketTimeoutException; import java.net.URL; import java.util.Arrays; import javax.net.ssl.SSLSocket; @@ -192,6 +193,39 @@ public final class Connection implements Closeable { return !socket.isClosed() && !socket.isInputShutdown() && !socket.isOutputShutdown(); } + /** + * Returns true if we are confident that we can read data from this + * connection. This is more expensive and more accurate than {@link + * #isAlive()}; callers should check {@link #isAlive()} first. + */ + public boolean isReadable() { + if (!(in instanceof BufferedInputStream)) { + return true; // Optimistic. + } + if (isSpdy()) { + return true; // Optimistic. We can't test SPDY because its streams are in use. + } + BufferedInputStream bufferedInputStream = (BufferedInputStream) in; + try { + int readTimeout = socket.getSoTimeout(); + try { + socket.setSoTimeout(1); + bufferedInputStream.mark(1); + if (bufferedInputStream.read() == -1) { + return false; // Stream is exhausted; socket is closed. + } + bufferedInputStream.reset(); + return true; + } finally { + socket.setSoTimeout(readTimeout); + } + } catch (SocketTimeoutException ignored) { + return true; // Read timed out; socket is good. + } catch (IOException e) { + return false; // Couldn't read; socket is closed. + } + } + public void resetIdleStartTime() { if (spdyConnection != null) { throw new IllegalStateException("spdyConnection != null"); diff --git a/okhttp/src/main/java/com/squareup/okhttp/ConnectionPool.java b/okhttp/src/main/java/com/squareup/okhttp/ConnectionPool.java index 009f025e5..42b70b980 100644 --- a/okhttp/src/main/java/com/squareup/okhttp/ConnectionPool.java +++ b/okhttp/src/main/java/com/squareup/okhttp/ConnectionPool.java @@ -216,8 +216,6 @@ public class ConnectionPool { *

It is an error to use {@code connection} after calling this method. */ public void recycle(Connection connection) { - executorService.submit(connectionsCleanupCallable); - if (connection.isSpdy()) { return; } @@ -240,6 +238,8 @@ public class ConnectionPool { connections.addFirst(connection); connection.resetIdleStartTime(); } + + executorService.submit(connectionsCleanupCallable); } /** diff --git a/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpEngine.java b/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpEngine.java index 51fd2a7ed..d6b6001e0 100644 --- a/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpEngine.java +++ b/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpEngine.java @@ -286,7 +286,7 @@ public class HttpEngine { routeSelector = new RouteSelector(address, uri, client.getProxySelector(), client.getConnectionPool(), Dns.DEFAULT, client.getRoutesDatabase()); } - connection = routeSelector.next(); + connection = routeSelector.next(method); if (!connection.isConnected()) { connection.connect(client.getConnectTimeout(), client.getReadTimeout(), getTunnelConfig()); client.getConnectionPool().maybeShare(connection); diff --git a/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpURLConnectionImpl.java b/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpURLConnectionImpl.java index e8c198fac..214f25add 100644 --- a/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpURLConnectionImpl.java +++ b/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpURLConnectionImpl.java @@ -19,8 +19,6 @@ package com.squareup.okhttp.internal.http; import com.squareup.okhttp.Connection; import com.squareup.okhttp.OkHttpClient; -import com.squareup.okhttp.internal.AbstractOutputStream; -import com.squareup.okhttp.internal.FaultRecoveringOutputStream; import com.squareup.okhttp.internal.Platform; import com.squareup.okhttp.internal.Util; import java.io.FileNotFoundException; @@ -69,20 +67,12 @@ public class HttpURLConnectionImpl extends HttpURLConnection implements Policy { */ private static final int MAX_REDIRECTS = 20; - /** - * The minimum number of request body bytes to transmit before we're willing - * to let a routine {@link IOException} bubble up to the user. This is used to - * size a buffer for data that will be replayed upon error. - */ - private static final int MAX_REPLAY_BUFFER_LENGTH = 8192; - final OkHttpClient client; private final RawHeaders rawRequestHeaders = new RawHeaders(); /** Like the superclass field of the same name, but a long and available on all platforms. */ private long fixedContentLength = -1; private int redirectionCount; - private FaultRecoveringOutputStream faultRecoveringRequestBody; protected IOException httpEngineFailure; protected HttpEngine httpEngine; @@ -212,22 +202,7 @@ public class HttpURLConnectionImpl extends HttpURLConnection implements Policy { throw new ProtocolException("cannot write request body after response has been read"); } - if (faultRecoveringRequestBody == null) { - faultRecoveringRequestBody = new FaultRecoveringOutputStream(MAX_REPLAY_BUFFER_LENGTH, out) { - @Override protected OutputStream replacementStream(IOException e) throws IOException { - if (httpEngine.getRequestBody() instanceof AbstractOutputStream - && ((AbstractOutputStream) httpEngine.getRequestBody()).isClosed()) { - return null; // Don't recover once the underlying stream has been closed. - } - if (handleFailure(e)) { - return httpEngine.getRequestBody(); - } - return null; // This is a permanent failure. - } - }; - } - - return faultRecoveringRequestBody; + return out; } @Override public final Permission getPermission() throws IOException { @@ -393,8 +368,7 @@ public class HttpURLConnectionImpl extends HttpURLConnection implements Policy { OutputStream requestBody = httpEngine.getRequestBody(); boolean canRetryRequestBody = requestBody == null - || requestBody instanceof RetryableOutputStream - || (faultRecoveringRequestBody != null && faultRecoveringRequestBody.isRecoverable()); + || requestBody instanceof RetryableOutputStream; if (routeSelector == null && httpEngine.connection == null // No connection. || routeSelector != null && !routeSelector.hasNext() // No more routes to attempt. || !isRecoverable(e) @@ -404,15 +378,9 @@ public class HttpURLConnectionImpl extends HttpURLConnection implements Policy { } httpEngine.release(true); - RetryableOutputStream retryableOutputStream = requestBody instanceof RetryableOutputStream - ? (RetryableOutputStream) requestBody - : null; + RetryableOutputStream retryableOutputStream = (RetryableOutputStream) requestBody; httpEngine = newHttpEngine(method, rawRequestHeaders, null, retryableOutputStream); httpEngine.routeSelector = routeSelector; // Keep the same routeSelector. - if (faultRecoveringRequestBody != null && faultRecoveringRequestBody.isRecoverable()) { - httpEngine.sendRequest(); - faultRecoveringRequestBody.replaceStream(httpEngine.getRequestBody()); - } return true; } diff --git a/okhttp/src/main/java/com/squareup/okhttp/internal/http/RouteSelector.java b/okhttp/src/main/java/com/squareup/okhttp/internal/http/RouteSelector.java index bab9df238..1055e4f09 100644 --- a/okhttp/src/main/java/com/squareup/okhttp/internal/http/RouteSelector.java +++ b/okhttp/src/main/java/com/squareup/okhttp/internal/http/RouteSelector.java @@ -102,11 +102,11 @@ public final class RouteSelector { * * @throws NoSuchElementException if there are no more routes to attempt. */ - public Connection next() throws IOException { + public Connection next(String method) throws IOException { // Always prefer pooled connections over new connections. - Connection pooled = pool.get(address); - if (pooled != null) { - return pooled; + for (Connection pooled; (pooled = pool.get(address)) != null; ) { + if (method.equals("GET") || pooled.isReadable()) return pooled; + pooled.close(); } // Compute the next route to attempt. @@ -131,7 +131,7 @@ public final class RouteSelector { postponedRoutes.add(route); // We will only recurse in order to skip previously failed routes. They will be // tried last. - return next(); + return next(method); } return new Connection(route); diff --git a/okhttp/src/test/java/com/squareup/okhttp/ConnectionPoolTest.java b/okhttp/src/test/java/com/squareup/okhttp/ConnectionPoolTest.java index e26e56303..6820b4353 100644 --- a/okhttp/src/test/java/com/squareup/okhttp/ConnectionPoolTest.java +++ b/okhttp/src/test/java/com/squareup/okhttp/ConnectionPoolTest.java @@ -395,7 +395,8 @@ public final class ConnectionPoolTest { Util.closeQuietly(httpA); // Include a closed connection in the pool. pool.recycle(httpB); pool.maybeShare(spdyA); - assertEquals(3, pool.getConnectionCount()); + int connectionCount = pool.getConnectionCount(); + assertTrue(connectionCount == 2 || connectionCount == 3); pool.evictAll(); assertEquals(0, pool.getConnectionCount()); diff --git a/okhttp/src/test/java/com/squareup/okhttp/internal/http/RouteSelectorTest.java b/okhttp/src/test/java/com/squareup/okhttp/internal/http/RouteSelectorTest.java index 1cdcb1dfe..a92db9ee1 100644 --- a/okhttp/src/test/java/com/squareup/okhttp/internal/http/RouteSelectorTest.java +++ b/okhttp/src/test/java/com/squareup/okhttp/internal/http/RouteSelectorTest.java @@ -88,12 +88,13 @@ public final class RouteSelectorTest { assertTrue(routeSelector.hasNext()); dns.inetAddresses = makeFakeAddresses(255, 1); - assertConnection(routeSelector.next(), address, NO_PROXY, dns.inetAddresses[0], uriPort, false); + assertConnection(routeSelector.next("GET"), address, NO_PROXY, dns.inetAddresses[0], uriPort, + false); dns.assertRequests(uriHost); assertFalse(routeSelector.hasNext()); try { - routeSelector.next(); + routeSelector.next("GET"); fail(); } catch (NoSuchElementException expected) { } @@ -106,14 +107,15 @@ public final class RouteSelectorTest { assertTrue(routeSelector.hasNext()); dns.inetAddresses = makeFakeAddresses(255, 1); - Connection connection = routeSelector.next(); + Connection connection = routeSelector.next("GET"); RouteDatabase routeDatabase = new RouteDatabase(); routeDatabase.failed(connection.getRoute(), new IOException()); routeSelector = new RouteSelector(address, uri, proxySelector, pool, dns, routeDatabase); - assertConnection(routeSelector.next(), address, NO_PROXY, dns.inetAddresses[0], uriPort, false); + assertConnection(routeSelector.next("GET"), address, NO_PROXY, dns.inetAddresses[0], uriPort, + false); assertFalse(routeSelector.hasNext()); try { - routeSelector.next(); + routeSelector.next("GET"); fail(); } catch (NoSuchElementException expected) { } @@ -126,9 +128,9 @@ public final class RouteSelectorTest { assertTrue(routeSelector.hasNext()); dns.inetAddresses = makeFakeAddresses(255, 2); - assertConnection(routeSelector.next(), address, proxyA, dns.inetAddresses[0], proxyAPort, + assertConnection(routeSelector.next("GET"), address, proxyA, dns.inetAddresses[0], proxyAPort, false); - assertConnection(routeSelector.next(), address, proxyA, dns.inetAddresses[1], proxyAPort, + assertConnection(routeSelector.next("GET"), address, proxyA, dns.inetAddresses[1], proxyAPort, false); assertFalse(routeSelector.hasNext()); @@ -144,8 +146,10 @@ public final class RouteSelectorTest { assertTrue(routeSelector.hasNext()); dns.inetAddresses = makeFakeAddresses(255, 2); - assertConnection(routeSelector.next(), address, NO_PROXY, dns.inetAddresses[0], uriPort, false); - assertConnection(routeSelector.next(), address, NO_PROXY, dns.inetAddresses[1], uriPort, false); + assertConnection(routeSelector.next("GET"), address, NO_PROXY, dns.inetAddresses[0], uriPort, + false); + assertConnection(routeSelector.next("GET"), address, NO_PROXY, dns.inetAddresses[1], uriPort, + false); assertFalse(routeSelector.hasNext()); dns.assertRequests(uri.getHost()); @@ -162,7 +166,8 @@ public final class RouteSelectorTest { assertTrue(routeSelector.hasNext()); dns.inetAddresses = makeFakeAddresses(255, 1); - assertConnection(routeSelector.next(), address, NO_PROXY, dns.inetAddresses[0], uriPort, false); + assertConnection(routeSelector.next("GET"), address, NO_PROXY, dns.inetAddresses[0], uriPort, + false); dns.assertRequests(uriHost); assertFalse(routeSelector.hasNext()); @@ -175,8 +180,10 @@ public final class RouteSelectorTest { assertTrue(routeSelector.hasNext()); dns.inetAddresses = makeFakeAddresses(255, 2); - assertConnection(routeSelector.next(), address, NO_PROXY, dns.inetAddresses[0], uriPort, false); - assertConnection(routeSelector.next(), address, NO_PROXY, dns.inetAddresses[1], uriPort, false); + assertConnection(routeSelector.next("GET"), address, NO_PROXY, dns.inetAddresses[0], uriPort, + false); + assertConnection(routeSelector.next("GET"), address, NO_PROXY, dns.inetAddresses[1], uriPort, + false); assertFalse(routeSelector.hasNext()); dns.assertRequests(uri.getHost()); @@ -195,23 +202,24 @@ public final class RouteSelectorTest { // First try the IP addresses of the first proxy, in sequence. assertTrue(routeSelector.hasNext()); dns.inetAddresses = makeFakeAddresses(255, 2); - assertConnection(routeSelector.next(), address, proxyA, dns.inetAddresses[0], proxyAPort, + assertConnection(routeSelector.next("GET"), address, proxyA, dns.inetAddresses[0], proxyAPort, false); - assertConnection(routeSelector.next(), address, proxyA, dns.inetAddresses[1], proxyAPort, + assertConnection(routeSelector.next("GET"), address, proxyA, dns.inetAddresses[1], proxyAPort, false); dns.assertRequests(proxyAHost); // Next try the IP address of the second proxy. assertTrue(routeSelector.hasNext()); dns.inetAddresses = makeFakeAddresses(254, 1); - assertConnection(routeSelector.next(), address, proxyB, dns.inetAddresses[0], proxyBPort, + assertConnection(routeSelector.next("GET"), address, proxyB, dns.inetAddresses[0], proxyBPort, false); dns.assertRequests(proxyBHost); // Finally try the only IP address of the origin server. assertTrue(routeSelector.hasNext()); dns.inetAddresses = makeFakeAddresses(253, 1); - assertConnection(routeSelector.next(), address, NO_PROXY, dns.inetAddresses[0], uriPort, false); + assertConnection(routeSelector.next("GET"), address, NO_PROXY, dns.inetAddresses[0], uriPort, + false); dns.assertRequests(uriHost); assertFalse(routeSelector.hasNext()); @@ -228,7 +236,8 @@ public final class RouteSelectorTest { // Only the origin server will be attempted. assertTrue(routeSelector.hasNext()); dns.inetAddresses = makeFakeAddresses(255, 1); - assertConnection(routeSelector.next(), address, NO_PROXY, dns.inetAddresses[0], uriPort, false); + assertConnection(routeSelector.next("GET"), address, NO_PROXY, dns.inetAddresses[0], uriPort, + false); dns.assertRequests(uriHost); assertFalse(routeSelector.hasNext()); @@ -246,14 +255,14 @@ public final class RouteSelectorTest { assertTrue(routeSelector.hasNext()); dns.inetAddresses = makeFakeAddresses(255, 1); - assertConnection(routeSelector.next(), address, proxyA, dns.inetAddresses[0], proxyAPort, + assertConnection(routeSelector.next("GET"), address, proxyA, dns.inetAddresses[0], proxyAPort, false); dns.assertRequests(proxyAHost); assertTrue(routeSelector.hasNext()); dns.inetAddresses = null; try { - routeSelector.next(); + routeSelector.next("GET"); fail(); } catch (UnknownHostException expected) { } @@ -261,13 +270,14 @@ public final class RouteSelectorTest { assertTrue(routeSelector.hasNext()); dns.inetAddresses = makeFakeAddresses(255, 1); - assertConnection(routeSelector.next(), address, proxyA, dns.inetAddresses[0], proxyAPort, + assertConnection(routeSelector.next("GET"), address, proxyA, dns.inetAddresses[0], proxyAPort, false); dns.assertRequests(proxyAHost); assertTrue(routeSelector.hasNext()); dns.inetAddresses = makeFakeAddresses(254, 1); - assertConnection(routeSelector.next(), address, NO_PROXY, dns.inetAddresses[0], uriPort, false); + assertConnection(routeSelector.next("GET"), address, NO_PROXY, dns.inetAddresses[0], uriPort, + false); dns.assertRequests(uriHost); assertFalse(routeSelector.hasNext()); @@ -281,7 +291,7 @@ public final class RouteSelectorTest { routeDatabase); dns.inetAddresses = makeFakeAddresses(255, 1); - Connection connection = routeSelector.next(); + Connection connection = routeSelector.next("GET"); routeSelector.connectFailed(connection, new IOException("Non SSL exception")); assertTrue(routeDatabase.failedRoutesCount() == 2); } @@ -294,7 +304,7 @@ public final class RouteSelectorTest { routeDatabase); dns.inetAddresses = makeFakeAddresses(255, 1); - Connection connection = routeSelector.next(); + Connection connection = routeSelector.next("GET"); routeSelector.connectFailed(connection, new SSLHandshakeException("SSL exception")); assertTrue(routeDatabase.failedRoutesCount() == 1); } @@ -309,31 +319,39 @@ public final class RouteSelectorTest { // Proxy A dns.inetAddresses = makeFakeAddresses(255, 2); - assertConnection(routeSelector.next(), address, proxyA, dns.inetAddresses[0], proxyAPort, true); + assertConnection(routeSelector.next("GET"), address, proxyA, dns.inetAddresses[0], proxyAPort, + true); dns.assertRequests(proxyAHost); - assertConnection(routeSelector.next(), address, proxyA, dns.inetAddresses[0], proxyAPort, + assertConnection(routeSelector.next("GET"), address, proxyA, dns.inetAddresses[0], proxyAPort, false); - assertConnection(routeSelector.next(), address, proxyA, dns.inetAddresses[1], proxyAPort, true); - assertConnection(routeSelector.next(), address, proxyA, dns.inetAddresses[1], proxyAPort, + assertConnection(routeSelector.next("GET"), address, proxyA, dns.inetAddresses[1], proxyAPort, + true); + assertConnection(routeSelector.next("GET"), address, proxyA, dns.inetAddresses[1], proxyAPort, false); // Proxy B dns.inetAddresses = makeFakeAddresses(254, 2); - assertConnection(routeSelector.next(), address, proxyB, dns.inetAddresses[0], proxyBPort, true); + assertConnection(routeSelector.next("GET"), address, proxyB, dns.inetAddresses[0], proxyBPort, + true); dns.assertRequests(proxyBHost); - assertConnection(routeSelector.next(), address, proxyB, dns.inetAddresses[0], proxyBPort, + assertConnection(routeSelector.next("GET"), address, proxyB, dns.inetAddresses[0], proxyBPort, false); - assertConnection(routeSelector.next(), address, proxyB, dns.inetAddresses[1], proxyBPort, true); - assertConnection(routeSelector.next(), address, proxyB, dns.inetAddresses[1], proxyBPort, + assertConnection(routeSelector.next("GET"), address, proxyB, dns.inetAddresses[1], proxyBPort, + true); + assertConnection(routeSelector.next("GET"), address, proxyB, dns.inetAddresses[1], proxyBPort, false); // Origin dns.inetAddresses = makeFakeAddresses(253, 2); - assertConnection(routeSelector.next(), address, NO_PROXY, dns.inetAddresses[0], uriPort, true); + assertConnection(routeSelector.next("GET"), address, NO_PROXY, dns.inetAddresses[0], uriPort, + true); dns.assertRequests(uriHost); - assertConnection(routeSelector.next(), address, NO_PROXY, dns.inetAddresses[0], uriPort, false); - assertConnection(routeSelector.next(), address, NO_PROXY, dns.inetAddresses[1], uriPort, true); - assertConnection(routeSelector.next(), address, NO_PROXY, dns.inetAddresses[1], uriPort, false); + assertConnection(routeSelector.next("GET"), address, NO_PROXY, dns.inetAddresses[0], uriPort, + false); + assertConnection(routeSelector.next("GET"), address, NO_PROXY, dns.inetAddresses[1], uriPort, + true); + assertConnection(routeSelector.next("GET"), address, NO_PROXY, dns.inetAddresses[1], uriPort, + false); assertFalse(routeSelector.hasNext()); } @@ -350,7 +368,7 @@ public final class RouteSelectorTest { // Extract the regular sequence of routes from selector. List regularRoutes = new ArrayList(); while (routeSelector.hasNext()) { - regularRoutes.add(routeSelector.next()); + regularRoutes.add(routeSelector.next("GET")); } // Check that we do indeed have more than one route. @@ -362,7 +380,7 @@ public final class RouteSelectorTest { List routesWithFailedRoute = new ArrayList(); while (routeSelector.hasNext()) { - routesWithFailedRoute.add(routeSelector.next()); + routesWithFailedRoute.add(routeSelector.next("GET")); } assertEquals(regularRoutes.get(0).getRoute(), diff --git a/okhttp/src/test/java/com/squareup/okhttp/internal/http/URLConnectionTest.java b/okhttp/src/test/java/com/squareup/okhttp/internal/http/URLConnectionTest.java index 29b5cab6a..5abe47769 100644 --- a/okhttp/src/test/java/com/squareup/okhttp/internal/http/URLConnectionTest.java +++ b/okhttp/src/test/java/com/squareup/okhttp/internal/http/URLConnectionTest.java @@ -2282,7 +2282,7 @@ public final class URLConnectionTest { } // This test is ignored because we don't (yet) reliably recover for large request bodies. - @Test @Ignore public void postFailsWithBufferedRequestForLargeRequest() throws Exception { + @Test public void postFailsWithBufferedRequestForLargeRequest() throws Exception { reusedConnectionFailsWithPost(TransferKind.END_OF_STREAM, 16384); } @@ -2290,8 +2290,7 @@ public final class URLConnectionTest { reusedConnectionFailsWithPost(TransferKind.CHUNKED, 1024); } - // This test is ignored because we don't (yet) reliably recover for large request bodies. - @Test @Ignore public void postFailsWithChunkedRequestForLargeRequest() throws Exception { + @Test public void postFailsWithChunkedRequestForLargeRequest() throws Exception { reusedConnectionFailsWithPost(TransferKind.CHUNKED, 16384); } @@ -2299,8 +2298,7 @@ public final class URLConnectionTest { reusedConnectionFailsWithPost(TransferKind.FIXED_LENGTH, 1024); } - // This test is ignored because we don't (yet) reliably recover for large request bodies. - @Test @Ignore public void postFailsWithFixedLengthRequestForLargeRequest() throws Exception { + @Test public void postFailsWithFixedLengthRequestForLargeRequest() throws Exception { reusedConnectionFailsWithPost(TransferKind.FIXED_LENGTH, 16384); }