diff --git a/src/main/java/com/squareup/okhttp/Connection.java b/src/main/java/com/squareup/okhttp/Connection.java index aaac65083..d6a7e53e3 100644 --- a/src/main/java/com/squareup/okhttp/Connection.java +++ b/src/main/java/com/squareup/okhttp/Connection.java @@ -83,10 +83,10 @@ public final class Connection implements Closeable { private Socket socket; private InputStream in; private OutputStream out; - private boolean recycled = false; private boolean connected = false; private SpdyConnection spdyConnection; private int httpMinorVersion = 1; // Assume HTTP/1.1 + private long idleStartTimeNs; public Connection(Address address, Proxy proxy, InetSocketAddress inetSocketAddress, boolean modernTls) { @@ -219,29 +219,40 @@ public final class Connection implements Closeable { } /** - * Returns true if this connection has been used to satisfy an earlier - * HTTP request/response pair. - * - *

The HTTP client treats recycled and non-recycled connections - * differently. I/O failures on recycled connections are often temporary: - * the remote peer may have closed the socket because it was idle for an - * extended period of time. When fresh connections suffer similar failures - * the problem is fatal and the request is not retried. + * Returns true if this connection is alive. */ - public boolean isRecycled() { - return recycled; + public boolean isAlive() { + return !socket.isClosed() && !socket.isInputShutdown() && !socket.isOutputShutdown(); } - public void setRecycled() { - this.recycled = true; + public void resetIdleStartTime() { + if (spdyConnection != null) { + throw new IllegalStateException("spdyConnection != null"); + } + this.idleStartTimeNs = System.nanoTime(); } /** - * Returns true if this connection is eligible to be reused for another - * request/response pair. + * Returns true if this connection is idle. */ - protected boolean isEligibleForRecycling() { - return !socket.isClosed() && !socket.isInputShutdown() && !socket.isOutputShutdown(); + public boolean isIdle() { + return spdyConnection == null || spdyConnection.isIdle(); + } + + /** + * Returns true if this connection has been idle for longer than + * {@code keepAliveDurationNs}. + */ + public boolean isExpired(long keepAliveDurationNs) { + return isIdle() && System.nanoTime() - getIdleStartTimeNs() > keepAliveDurationNs; + } + + /** + * Returns the time in ns when this connection became idle. Undefined if + * this connection is not idle. + */ + public long getIdleStartTimeNs() { + return spdyConnection == null ? idleStartTimeNs : spdyConnection.getIdleStartTimeNs(); } /** @@ -261,7 +272,11 @@ public final class Connection implements Closeable { return spdyConnection != null; } - /** + public SpdyConnection getSpdyConnection() { + return spdyConnection; + } + + /** * Returns the minor HTTP version that should be used for future requests on * this connection. Either 0 for HTTP/1.0, or 1 for HTTP/1.1. The default * value is 1 for new connections. diff --git a/src/main/java/com/squareup/okhttp/ConnectionPool.java b/src/main/java/com/squareup/okhttp/ConnectionPool.java index 408034994..b32c9d2ac 100644 --- a/src/main/java/com/squareup/okhttp/ConnectionPool.java +++ b/src/main/java/com/squareup/okhttp/ConnectionPool.java @@ -1,173 +1,244 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package com.squareup.okhttp; import com.squareup.okhttp.internal.Platform; import com.squareup.okhttp.internal.Util; import java.net.SocketException; import java.util.ArrayList; -import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** - * Manages reuse of HTTP and SPDY connections for reduced network latency. HTTP - * requests that share the same {@link Address} may share a {@link Connection}. - * This class implements the policy of which connections to keep open for future - * use. + * Manages reuse of HTTP and SPDY connections for reduced network latency. HTTP requests that share + * the same {@link com.squareup.okhttp.Address} may share a {@link com.squareup.okhttp.Connection}. + * This class implements the policy of which connections to keep open for future use. * - *

The {@link #getDefault() system-wide default} uses system properties for - * tuning parameters: + *

The {@link #getDefault() system-wide default} uses system properties for tuning parameters: *

* - *

The default instance doesn't adjust its configuration as system - * properties are changed. This assumes that the applications that set these - * parameters do so before making HTTP connections, and that this class is - * initialized lazily. + *

The default instance doesn't adjust its configuration as system properties are changed. + * This assumes that the applications that set these parameters do so before making HTTP + * connections, and that this class is initialized lazily. */ -public final class ConnectionPool { - private static final ConnectionPool systemDefault; - static { - String keepAlive = System.getProperty("http.keepAlive"); - String maxConnections = System.getProperty("http.maxConnections"); - if (keepAlive != null && !Boolean.parseBoolean(keepAlive)) { - systemDefault = new ConnectionPool(0); - } else if (maxConnections != null) { - systemDefault = new ConnectionPool(Integer.parseInt(maxConnections)); - } else { - systemDefault = new ConnectionPool(5); - } +public class ConnectionPool { + private static final int MAX_CONNECTIONS_TO_CLEANUP = 2; + private static final long DEFAULT_KEEP_ALIVE_DURATION_MS = 5 * 60 * 1000; // 5 min + + private static final ConnectionPool systemDefault; + + static { + String keepAlive = System.getProperty("http.keepAlive"); + String keepAliveDuration = System.getProperty("http.keepAliveDuration"); + String maxIdleConnections = System.getProperty("http.maxConnections"); + long keepAliveDurationMs = keepAliveDuration != null ? Long.parseLong(keepAliveDuration) + : DEFAULT_KEEP_ALIVE_DURATION_MS; + if (keepAlive != null && !Boolean.parseBoolean(keepAlive)) { + systemDefault = new ConnectionPool(0, keepAliveDurationMs); + } else if (maxIdleConnections != null) { + systemDefault = new ConnectionPool(Integer.parseInt(maxIdleConnections), keepAliveDurationMs); + } else { + systemDefault = new ConnectionPool(5, keepAliveDurationMs); } + } - /** The maximum number of idle connections for each address. */ - private final int maxConnections; - private final HashMap> connectionPool - = new HashMap>(); + /** The maximum number of idle connections for each address. */ + private final int maxIdleConnections; + private final long keepAliveDurationNs; - public ConnectionPool(int maxConnections) { - this.maxConnections = maxConnections; - } + private final LinkedList connections = new LinkedList(); - public static ConnectionPool getDefault() { - return systemDefault; - } - - /** - * Returns a recycled connection to {@code address}, or null if no such - * connection exists. - */ - public Connection get(Address address) { - // First try to reuse an existing HTTP connection. - synchronized (connectionPool) { - List connections = connectionPool.get(address); - while (connections != null) { - Connection connection = connections.get(connections.size() - 1); - boolean usable = connection.isEligibleForRecycling(); - if (usable && !connection.isSpdy()) { - try { - Platform.get().tagSocket(connection.getSocket()); - } catch (SocketException e) { - // When unable to tag, skip recycling and close - Platform.get().logW("Unable to tagSocket(): " + e); - usable = false; - } - } - - if (!connection.isSpdy() || !usable) { - connections.remove(connections.size() - 1); - if (connections.isEmpty()) { - connectionPool.remove(address); - connections = null; - } - } - - if (usable) { - return connection; - } else { - Util.closeQuietly(connection); - } - } - } - return null; - } - - /** - * Gives {@code connection} to the pool. The pool may store the connection, - * or close it, as its policy describes. - * - *

It is an error to use {@code connection} after calling this method. - */ - public void recycle(Connection connection) { - if (connection.isSpdy()) { - return; + /** We use a single background thread to cleanup expired connections. */ + private final ExecutorService executorService = + new ThreadPoolExecutor(0, 1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue()); + private final Callable connectionsCleanupCallable = new Callable() { + @Override public Void call() throws Exception { + List expiredConnections = new ArrayList(MAX_CONNECTIONS_TO_CLEANUP); + int idleConnectionCount = 0; + synchronized (ConnectionPool.this) { + for (Iterator i = connections.descendingIterator(); i.hasNext(); ) { + Connection connection = i.next(); + if (!connection.isAlive() || connection.isExpired(keepAliveDurationNs)) { + i.remove(); + expiredConnections.add(connection); + if (expiredConnections.size() == MAX_CONNECTIONS_TO_CLEANUP) break; + } else if (connection.isIdle()) { + idleConnectionCount++; + } } + for (Iterator i = connections.descendingIterator(); i.hasNext() + && idleConnectionCount > maxIdleConnections; ) { + Connection connection = i.next(); + if (connection.isIdle()) { + expiredConnections.add(connection); + i.remove(); + --idleConnectionCount; + } + } + } + for (Connection expiredConnection : expiredConnections) { + Util.closeQuietly(expiredConnection); + } + return null; + } + }; + + public ConnectionPool(int maxIdleConnections, long keepAliveDurationMs) { + this.maxIdleConnections = maxIdleConnections; + this.keepAliveDurationNs = keepAliveDurationMs * 1000 * 1000; + } + + /** + * Returns a snapshot of the connections in this pool, ordered from newest to + * oldest. Waits for the cleanup callable to run if it is currently scheduled. + */ + List getConnections() { + waitForCleanupCallableToRun(); + synchronized (this) { + return new ArrayList(connections); + } + } + + /** + * Blocks until the executor service has processed all currently enqueued + * jobs. + */ + private void waitForCleanupCallableToRun() { + try { + executorService.submit(new Runnable() { + @Override public void run() { + } + }).get(); + } catch (Exception e) { + throw new AssertionError(); + } + } + + public static ConnectionPool getDefault() { + return systemDefault; + } + + /** + * Returns total number of connections in the pool. + */ + public synchronized int getConnectionCount() { + return connections.size(); + } + + /** + * Returns total number of spdy connections in the pool. + */ + public synchronized int getSpdyConnectionCount() { + int total = 0; + for (Connection connection : connections) { + if (connection.isSpdy()) total++; + } + return total; + } + + /** + * Returns total number of http connections in the pool. + */ + public synchronized int getHttpConnectionCount() { + int total = 0; + for (Connection connection : connections) { + if (!connection.isSpdy()) total++; + } + return total; + } + + /** Returns a recycled connection to {@code address}, or null if no such connection exists. */ + public synchronized Connection get(Address address) { + Connection foundConnection = null; + for (Iterator i = connections.descendingIterator(); i.hasNext(); ) { + Connection connection = i.next(); + if (!connection.getAddress().equals(address) + || !connection.isAlive() + || System.nanoTime() - connection.getIdleStartTimeNs() >= keepAliveDurationNs) { + continue; + } + i.remove(); + if (!connection.isSpdy()) { try { - Platform.get().untagSocket(connection.getSocket()); + Platform.get().tagSocket(connection.getSocket()); } catch (SocketException e) { - // When unable to remove tagging, skip recycling and close - Platform.get().logW("Unable to untagSocket(): " + e); - Util.closeQuietly(connection); - return; + Util.closeQuietly(connection); + // When unable to tag, skip recycling and close + Platform.get().logW("Unable to tagSocket(): " + e); + continue; } - - if (maxConnections > 0 && connection.isEligibleForRecycling()) { - Address address = connection.getAddress(); - synchronized (connectionPool) { - List connections = connectionPool.get(address); - if (connections == null) { - connections = new ArrayList(); - connectionPool.put(address, connections); - } - if (connections.size() < maxConnections) { - connection.setRecycled(); - connections.add(connection); - return; // keep the connection open - } - } - } - - // don't close streams while holding a lock! - Util.closeQuietly(connection); + } + foundConnection = connection; + break; } - /** - * Shares the SPDY connection with the pool. Callers to this method may - * continue to use {@code connection}. - */ - public void share(Connection connection) { - if (!connection.isSpdy()) { - throw new IllegalArgumentException(); - } - if (maxConnections <= 0 || !connection.isEligibleForRecycling()) { - return; - } - Address address = connection.getAddress(); - synchronized (connectionPool) { - List connections = connectionPool.get(address); - if (connections == null) { - connections = new ArrayList(1); - connections.add(connection); - connectionPool.put(address, connections); - } - } + if (foundConnection != null && foundConnection.isSpdy()) { + connections.addFirst(foundConnection); // Add it back after iteration. } + + executorService.submit(connectionsCleanupCallable); + return foundConnection; + } + + /** + * Gives {@code connection} to the pool. The pool may store the connection, + * or close it, as its policy describes. + * + *

It is an error to use {@code connection} after calling this method. + */ + public void recycle(Connection connection) { + executorService.submit(connectionsCleanupCallable); + + if (connection.isSpdy()) { + return; + } + + if (!connection.isAlive()) { + Util.closeQuietly(connection); + return; + } + + try { + Platform.get().untagSocket(connection.getSocket()); + } catch (SocketException e) { + // When unable to remove tagging, skip recycling and close. + Platform.get().logW("Unable to untagSocket(): " + e); + Util.closeQuietly(connection); + return; + } + + synchronized (this) { + connections.addFirst(connection); + connection.resetIdleStartTime(); + } + } + + /** + * Shares the SPDY connection with the pool. Callers to this method may + * continue to use {@code connection}. + */ + public void maybeShare(Connection connection) { + executorService.submit(connectionsCleanupCallable); + if (!connection.isSpdy()) { + // Only SPDY connections are sharable. + return; + } + if (connection.isAlive()) { + synchronized (this) { + connections.addFirst(connection); + } + } + } } diff --git a/src/main/java/com/squareup/okhttp/internal/http/HttpEngine.java b/src/main/java/com/squareup/okhttp/internal/http/HttpEngine.java index 579da94a0..1a23a2af2 100644 --- a/src/main/java/com/squareup/okhttp/internal/http/HttpEngine.java +++ b/src/main/java/com/squareup/okhttp/internal/http/HttpEngine.java @@ -287,9 +287,7 @@ public class HttpEngine { if (!connection.isConnected()) { connection.connect(policy.getConnectTimeout(), policy.getReadTimeout(), getTunnelConfig()); - if (connection.isSpdy()) { - policy.connectionPool.share(connection); - } + policy.connectionPool.maybeShare(connection); } connected(connection); Proxy proxy = connection.getProxy(); @@ -383,10 +381,6 @@ public class HttpEngine { return connection; } - public final boolean hasRecycledConnection() { - return connection != null && connection.isRecycled(); - } - /** * Returns true if {@code cacheResponse} is of the right type. This * condition is necessary but not sufficient for the cached response to diff --git a/src/main/java/com/squareup/okhttp/internal/http/HttpURLConnectionImpl.java b/src/main/java/com/squareup/okhttp/internal/http/HttpURLConnectionImpl.java index f52857d93..10773a6d1 100644 --- a/src/main/java/com/squareup/okhttp/internal/http/HttpURLConnectionImpl.java +++ b/src/main/java/com/squareup/okhttp/internal/http/HttpURLConnectionImpl.java @@ -326,8 +326,6 @@ public class HttpURLConnectionImpl extends HttpURLConnection { if (retry == Retry.DIFFERENT_CONNECTION) { httpEngine.automaticallyReleaseConnectionToPool(); - } else if (retry == Retry.SAME_CONNECTION && httpEngine.getConnection() != null) { - httpEngine.getConnection().setRecycled(); } httpEngine.release(false); diff --git a/src/main/java/com/squareup/okhttp/internal/http/RouteSelector.java b/src/main/java/com/squareup/okhttp/internal/http/RouteSelector.java index ac4bb6c8c..144344a9b 100644 --- a/src/main/java/com/squareup/okhttp/internal/http/RouteSelector.java +++ b/src/main/java/com/squareup/okhttp/internal/http/RouteSelector.java @@ -37,7 +37,7 @@ import java.util.NoSuchElementException; * choice of proxy server, IP address, and TLS mode. Connections may also be * recycled. */ -final class RouteSelector { +public final class RouteSelector { /** Uses {@link com.squareup.okhttp.internal.Platform#enableTlsExtensions}. */ private static final int TLS_MODE_MODERN = 1; /** Uses {@link com.squareup.okhttp.internal.Platform#supportTlsIntolerantServer}. */ diff --git a/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java b/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java index 347be634f..70228dd1d 100644 --- a/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java +++ b/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java @@ -98,6 +98,7 @@ public final class SpdyConnection implements Closeable { private int lastGoodStreamId; private int nextStreamId; private boolean shutdown; + private long idleStartTimeNs = System.nanoTime(); /** Lazily-created map of in-flight pings awaiting a response. Guarded by this. */ private Map pings; @@ -138,7 +139,29 @@ public final class SpdyConnection implements Closeable { } synchronized SpdyStream removeStream(int streamId) { - return streams.remove(streamId); + SpdyStream stream = streams.remove(streamId); + if (stream != null && streams.isEmpty()) { + setIdle(true); + } + return stream; + } + + private void setIdle(boolean value) { + idleStartTimeNs = value ? System.nanoTime() : 0L; + } + + /** + * Returns true if this connection is idle. + */ + public boolean isIdle() { + return idleStartTimeNs != 0L; + } + + /** + * Returns the time in ns when this connection became idle or 0L if connection is not idle. + */ + public long getIdleStartTimeNs() { + return idleStartTimeNs; } /** @@ -169,6 +192,7 @@ public final class SpdyConnection implements Closeable { settings); if (stream.isOpen()) { streams.put(streamId, stream); + setIdle(false); } } @@ -334,6 +358,7 @@ public final class SpdyConnection implements Closeable { if (!streams.isEmpty()) { streamsToClose = streams.values().toArray(new SpdyStream[streams.size()]); streams.clear(); + setIdle(false); } if (pings != null) { pingsToCancel = pings.values().toArray(new Ping[pings.size()]); diff --git a/src/test/java/com/squareup/okhttp/ConnectionPoolTest.java b/src/test/java/com/squareup/okhttp/ConnectionPoolTest.java new file mode 100644 index 000000000..6a5997fb3 --- /dev/null +++ b/src/test/java/com/squareup/okhttp/ConnectionPoolTest.java @@ -0,0 +1,389 @@ +/* + * Copyright (C) 2013 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.squareup.okhttp; + +import com.google.mockwebserver.MockWebServer; +import com.squareup.okhttp.internal.RecordingHostnameVerifier; +import com.squareup.okhttp.internal.SslContextBuilder; +import com.squareup.okhttp.internal.Util; +import com.squareup.okhttp.internal.mockspdyserver.MockSpdyServer; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Proxy; +import java.net.UnknownHostException; +import java.security.GeneralSecurityException; +import java.util.Arrays; +import javax.net.ssl.SSLContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +public final class ConnectionPoolTest { + private static final int KEEP_ALIVE_DURATION_MS = 500; + private static final SSLContext sslContext; + static { + try { + sslContext = new SslContextBuilder(InetAddress.getLocalHost().getHostName()).build(); + } catch (GeneralSecurityException e) { + throw new RuntimeException(e); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + } + private final MockSpdyServer spdyServer = new MockSpdyServer(sslContext.getSocketFactory()); + private InetSocketAddress spdySocketAddress; + private Address spdyAddress; + + private final MockWebServer httpServer = new MockWebServer(); + private Address httpAddress; + private InetSocketAddress httpSocketAddress; + + private Connection httpA; + private Connection httpB; + private Connection httpC; + private Connection httpD; + private Connection httpE; + private Connection spdyA; + private Connection spdyB; + + @Before public void setUp() throws Exception { + httpServer.play(); + httpAddress = new Address(httpServer.getHostName(), httpServer.getPort(), null, null, null); + httpSocketAddress = new InetSocketAddress(InetAddress.getByName(httpServer.getHostName()), + httpServer.getPort()); + + spdyServer.play(); + spdyAddress = new Address(spdyServer.getHostName(), spdyServer.getPort(), + sslContext.getSocketFactory(), new RecordingHostnameVerifier(), null); + spdySocketAddress = new InetSocketAddress(InetAddress.getByName(spdyServer.getHostName()), + spdyServer.getPort()); + + httpA = new Connection(httpAddress, Proxy.NO_PROXY, httpSocketAddress, true); + httpA.connect(100, 100, null); + httpB = new Connection(httpAddress, Proxy.NO_PROXY, httpSocketAddress, true); + httpB.connect(100, 100, null); + httpC = new Connection(httpAddress, Proxy.NO_PROXY, httpSocketAddress, true); + httpC.connect(100, 100, null); + httpD = new Connection(httpAddress, Proxy.NO_PROXY, httpSocketAddress, true); + httpD.connect(100, 100, null); + httpE = new Connection(httpAddress, Proxy.NO_PROXY, httpSocketAddress, true); + httpE.connect(100, 100, null); + spdyA = new Connection(spdyAddress, Proxy.NO_PROXY, spdySocketAddress, true); + spdyA.connect(100, 100, null); + spdyB = new Connection(spdyAddress, Proxy.NO_PROXY, spdySocketAddress, true); + spdyB.connect(100, 100, null); + } + + @After public void tearDown() throws Exception { + httpServer.shutdown(); + spdyServer.shutdown(); + + Util.closeQuietly(httpA); + Util.closeQuietly(httpB); + Util.closeQuietly(httpC); + Util.closeQuietly(httpD); + Util.closeQuietly(httpE); + Util.closeQuietly(spdyA); + Util.closeQuietly(spdyB); + } + + @Test public void poolSingleHttpConnection() throws IOException { + ConnectionPool pool = new ConnectionPool(1, KEEP_ALIVE_DURATION_MS); + Connection connection = pool.get(httpAddress); + assertNull(connection); + + connection = new Connection(httpAddress, Proxy.NO_PROXY, httpSocketAddress, true); + connection.connect(100, 100, null); + assertEquals(0, pool.getConnectionCount()); + pool.recycle(connection); + assertEquals(1, pool.getConnectionCount()); + assertEquals(1, pool.getHttpConnectionCount()); + assertEquals(0, pool.getSpdyConnectionCount()); + + Connection recycledConnection = pool.get(httpAddress); + assertEquals(connection, recycledConnection); + assertTrue(recycledConnection.isAlive()); + + recycledConnection = pool.get(httpAddress); + assertNull(recycledConnection); + } + + @Test public void poolPrefersMostRecentlyRecycled() throws Exception { + ConnectionPool pool = new ConnectionPool(2, KEEP_ALIVE_DURATION_MS); + pool.recycle(httpA); + pool.recycle(httpB); + pool.recycle(httpC); + assertPooled(pool, httpC, httpB); + } + + @Test public void getSpdyConnection() throws Exception { + ConnectionPool pool = new ConnectionPool(2, KEEP_ALIVE_DURATION_MS); + pool.maybeShare(spdyA); + assertSame(spdyA, pool.get(spdyAddress)); + assertPooled(pool, spdyA); + } + + @Test public void getHttpConnection() throws Exception { + ConnectionPool pool = new ConnectionPool(2, KEEP_ALIVE_DURATION_MS); + pool.recycle(httpA); + assertSame(httpA, pool.get(httpAddress)); + assertPooled(pool); + } + + @Test public void idleConnectionNotReturned() throws Exception { + ConnectionPool pool = new ConnectionPool(2, KEEP_ALIVE_DURATION_MS); + pool.recycle(httpA); + Thread.sleep(KEEP_ALIVE_DURATION_MS * 2); + assertNull(pool.get(httpAddress)); + assertPooled(pool); + } + + @Test public void maxIdleConnectionLimitIsEnforced() throws Exception { + ConnectionPool pool = new ConnectionPool(2, KEEP_ALIVE_DURATION_MS); + pool.recycle(httpA); + pool.recycle(httpB); + pool.recycle(httpC); + pool.recycle(httpD); + assertPooled(pool, httpD, httpC); + } + + @Test public void expiredConnectionsAreEvicted() throws Exception { + ConnectionPool pool = new ConnectionPool(2, KEEP_ALIVE_DURATION_MS); + pool.recycle(httpA); + pool.recycle(httpB); + Thread.sleep(2 * KEEP_ALIVE_DURATION_MS); + pool.get(spdyAddress); // Force the cleanup callable to run. + assertPooled(pool); + } + + @Test public void nonAliveConnectionNotReturned() throws Exception { + ConnectionPool pool = new ConnectionPool(2, KEEP_ALIVE_DURATION_MS); + pool.recycle(httpA); + httpA.close(); + assertNull(pool.get(httpAddress)); + assertPooled(pool); + } + + @Test public void differentAddressConnectionNotReturned() throws Exception { + ConnectionPool pool = new ConnectionPool(2, KEEP_ALIVE_DURATION_MS); + pool.recycle(httpA); + assertNull(pool.get(spdyAddress)); + assertPooled(pool, httpA); + } + + @Test public void gettingSpdyConnectionPromotesItToFrontOfQueue() throws Exception { + ConnectionPool pool = new ConnectionPool(2, KEEP_ALIVE_DURATION_MS); + pool.maybeShare(spdyA); + pool.recycle(httpA); + assertPooled(pool, httpA, spdyA); + assertSame(spdyA, pool.get(spdyAddress)); + assertPooled(pool, spdyA, httpA); + } + + @Test public void gettingConnectionReturnsOldestFirst() throws Exception { + ConnectionPool pool = new ConnectionPool(2, KEEP_ALIVE_DURATION_MS); + pool.recycle(httpA); + pool.recycle(httpB); + assertSame(httpA, pool.get(httpAddress)); + } + + @Test public void recyclingNonAliveConnectionClosesThatConnection() throws Exception { + ConnectionPool pool = new ConnectionPool(2, KEEP_ALIVE_DURATION_MS); + httpA.getSocket().shutdownInput(); + pool.recycle(httpA); // Should close httpA. + assertTrue(httpA.getSocket().isClosed()); + } + + @Test public void shareHttpConnectionDoesNothing() throws Exception { + ConnectionPool pool = new ConnectionPool(2, KEEP_ALIVE_DURATION_MS); + pool.maybeShare(httpA); + assertPooled(pool); + } + + @Test public void recycleSpdyConnectionDoesNothing() throws Exception { + ConnectionPool pool = new ConnectionPool(2, KEEP_ALIVE_DURATION_MS); + pool.recycle(spdyA); + assertPooled(pool); + } + + @Test public void validateIdleSpdyConnectionTimeout() throws Exception { + ConnectionPool pool = new ConnectionPool(2, KEEP_ALIVE_DURATION_MS); + pool.maybeShare(spdyA); + Thread.sleep((int)(KEEP_ALIVE_DURATION_MS * 0.7)); + assertNull(pool.get(httpAddress)); + assertPooled(pool, spdyA); // Connection should still be in the pool. + Thread.sleep((int)(KEEP_ALIVE_DURATION_MS * 0.4)); + assertNull(pool.get(httpAddress)); + assertPooled(pool); + } + + @Test public void validateIdleHttpConnectionTimeout() throws Exception { + ConnectionPool pool = new ConnectionPool(2, KEEP_ALIVE_DURATION_MS); + pool.recycle(httpA); + Thread.sleep((int)(KEEP_ALIVE_DURATION_MS * 0.7)); + assertNull(pool.get(spdyAddress)); + assertPooled(pool, httpA); // Connection should still be in the pool. + Thread.sleep((int)(KEEP_ALIVE_DURATION_MS * 0.4)); + assertNull(pool.get(spdyAddress)); + assertPooled(pool); + } + + @Test public void maxConnections() throws IOException, InterruptedException { + ConnectionPool pool = new ConnectionPool(2, KEEP_ALIVE_DURATION_MS); + + // Pool should be empty. + assertEquals(0, pool.getConnectionCount()); + + // http A should be added to the pool. + pool.recycle(httpA); + assertEquals(1, pool.getConnectionCount()); + assertEquals(1, pool.getHttpConnectionCount()); + assertEquals(0, pool.getSpdyConnectionCount()); + + // http B should be added to the pool. + pool.recycle(httpB); + assertEquals(2, pool.getConnectionCount()); + assertEquals(2, pool.getHttpConnectionCount()); + assertEquals(0, pool.getSpdyConnectionCount()); + + // http C should be added and http A should be removed. + pool.recycle(httpC); + Thread.sleep(50); + assertEquals(2, pool.getConnectionCount()); + assertEquals(2, pool.getHttpConnectionCount()); + assertEquals(0, pool.getSpdyConnectionCount()); + + // spdy A should be added and http B should be removed. + pool.maybeShare(spdyA); + Thread.sleep(50); + assertEquals(2, pool.getConnectionCount()); + assertEquals(1, pool.getHttpConnectionCount()); + assertEquals(1, pool.getSpdyConnectionCount()); + + // http C should be removed from the pool. + Connection recycledHttpConnection = pool.get(httpAddress); + assertNotNull(recycledHttpConnection); + assertTrue(recycledHttpConnection.isAlive()); + assertEquals(1, pool.getConnectionCount()); + assertEquals(0, pool.getHttpConnectionCount()); + assertEquals(1, pool.getSpdyConnectionCount()); + + // spdy A will be returned and kept in the pool. + Connection sharedSpdyConnection = pool.get(spdyAddress); + assertNotNull(sharedSpdyConnection); + assertEquals(spdyA, sharedSpdyConnection); + assertEquals(1, pool.getConnectionCount()); + assertEquals(0, pool.getHttpConnectionCount()); + assertEquals(1, pool.getSpdyConnectionCount()); + + // Nothing should change. + pool.recycle(httpC); + Thread.sleep(50); + assertEquals(2, pool.getConnectionCount()); + assertEquals(1, pool.getHttpConnectionCount()); + assertEquals(1, pool.getSpdyConnectionCount()); + + // Nothing should change. + pool.maybeShare(spdyB); + Thread.sleep(50); + assertEquals(2, pool.getConnectionCount()); + assertEquals(1, pool.getHttpConnectionCount()); + assertEquals(1, pool.getSpdyConnectionCount()); + + // An http connection should be removed from the pool. + recycledHttpConnection = pool.get(httpAddress); + assertNotNull(recycledHttpConnection); + assertTrue(recycledHttpConnection.isAlive()); + assertEquals(1, pool.getConnectionCount()); + assertEquals(0, pool.getHttpConnectionCount()); + assertEquals(1, pool.getSpdyConnectionCount()); + + // Shouldn't change numbers because spdyConnections A and B user the same server address. + pool.maybeShare(spdyB); + Thread.sleep(50); + assertEquals(1, pool.getConnectionCount()); + assertEquals(0, pool.getHttpConnectionCount()); + assertEquals(1, pool.getSpdyConnectionCount()); + + // spdy A will be returned and kept in the pool. Pool shouldn't change. + sharedSpdyConnection = pool.get(spdyAddress); + assertEquals(spdyA, sharedSpdyConnection); + assertNotNull(sharedSpdyConnection); + assertEquals(1, pool.getConnectionCount()); + assertEquals(0, pool.getHttpConnectionCount()); + assertEquals(1, pool.getSpdyConnectionCount()); + + // http D should be added to the pool. + pool.recycle(httpD); + Thread.sleep(50); + assertEquals(2, pool.getConnectionCount()); + assertEquals(1, pool.getHttpConnectionCount()); + assertEquals(1, pool.getSpdyConnectionCount()); + + // http E should be added to the pool. spdy A should be removed from the pool. + pool.recycle(httpE); + Thread.sleep(50); + assertEquals(2, pool.getConnectionCount()); + assertEquals(2, pool.getHttpConnectionCount()); + assertEquals(0, pool.getSpdyConnectionCount()); + } + + @Test public void connectionCleanup() throws IOException, InterruptedException { + ConnectionPool pool = new ConnectionPool(10, KEEP_ALIVE_DURATION_MS); + + // Add 3 connections to the pool. + pool.recycle(httpA); + pool.recycle(httpB); + pool.maybeShare(spdyA); + assertEquals(3, pool.getConnectionCount()); + assertEquals(2, pool.getHttpConnectionCount()); + assertEquals(1, pool.getSpdyConnectionCount()); + + // Kill http A. + Util.closeQuietly(httpA); + + // Force pool to run a clean up. + assertNotNull(pool.get(spdyAddress)); + Thread.sleep(50); + + assertEquals(2, pool.getConnectionCount()); + assertEquals(1, pool.getHttpConnectionCount()); + assertEquals(1, pool.getSpdyConnectionCount()); + + Thread.sleep(KEEP_ALIVE_DURATION_MS); + // Force pool to run a clean up. + assertNull(pool.get(httpAddress)); + assertNull(pool.get(spdyAddress)); + + Thread.sleep(50); + + assertEquals(0, pool.getConnectionCount()); + assertEquals(0, pool.getHttpConnectionCount()); + assertEquals(0, pool.getSpdyConnectionCount()); + } + + private void assertPooled(ConnectionPool pool, Connection... connections) throws Exception { + assertEquals(Arrays.asList(connections), pool.getConnections()); + } + +}