From bc3ad111ad01100a77846f7dc433b0c0f5b58dba Mon Sep 17 00:00:00 2001 From: Jesse Wilson Date: Mon, 23 Sep 2019 22:36:05 -0400 Subject: [PATCH] Adopt TaskRunner in RealConnectionPool This also configures tests to assert that the connection pool isn't doing any work after the test completes. --- .../main/java/okhttp3/OkHttpClientTestRule.kt | 27 ++++ .../src/main/java/okhttp3/ConnectionPool.kt | 18 ++- .../java/okhttp3/internal/concurrent/Task.kt | 3 +- .../okhttp3/internal/concurrent/TaskQueue.kt | 2 +- .../okhttp3/internal/concurrent/TaskRunner.kt | 13 +- .../internal/connection/RealConnectionPool.kt | 58 ++----- .../concurrent/TaskRunnerRealBackendTest.kt | 4 +- .../internal/concurrent/TaskRunnerTest.kt | 153 +++++++++--------- .../connection/ConnectionPoolTest.java | 38 +++-- 9 files changed, 163 insertions(+), 153 deletions(-) diff --git a/okhttp-testing-support/src/main/java/okhttp3/OkHttpClientTestRule.kt b/okhttp-testing-support/src/main/java/okhttp3/OkHttpClientTestRule.kt index 6d3484807..a39b3cc9a 100644 --- a/okhttp-testing-support/src/main/java/okhttp3/OkHttpClientTestRule.kt +++ b/okhttp-testing-support/src/main/java/okhttp3/OkHttpClientTestRule.kt @@ -15,6 +15,9 @@ */ package okhttp3 +import okhttp3.internal.concurrent.Task +import okhttp3.internal.concurrent.TaskQueue +import okhttp3.internal.concurrent.TaskRunner import okhttp3.testing.Flaky import org.assertj.core.api.Assertions.assertThat import org.junit.rules.TestRule @@ -22,6 +25,8 @@ import org.junit.runner.Description import org.junit.runners.model.Statement import java.net.InetAddress import java.util.concurrent.ConcurrentLinkedDeque +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit /** Apply this rule to tests that need an OkHttpClient instance. */ class OkHttpClientTestRule : TestRule { @@ -59,6 +64,14 @@ class OkHttpClientTestRule : TestRule { } } + private fun ensureAllTaskQueuesIdle() { + for (queue in TaskRunner.INSTANCE.activeQueues()) { + assertThat(queue.awaitIdle(500L, TimeUnit.MILLISECONDS)) + .withFailMessage("Queue ${queue.owner} still active after 500ms") + .isTrue() + } + } + override fun apply(base: Statement, description: Description): Statement { return object : Statement() { override fun evaluate() { @@ -72,6 +85,7 @@ class OkHttpClientTestRule : TestRule { } finally { ensureAllConnectionsReleased() releaseClient() + ensureAllTaskQueuesIdle() } } @@ -119,6 +133,19 @@ class OkHttpClientTestRule : TestRule { } } + /** Returns true if this queue became idle before the timeout elapsed. */ + private fun TaskQueue.awaitIdle(timeout: Long, timeUnit: TimeUnit): Boolean { + val latch = CountDownLatch(1) + schedule(object : Task("awaitIdle") { + override fun runOnce(): Long { + latch.countDown() + return -1L + } + }) + + return latch.await(timeout, timeUnit) + } + companion object { /** * Quick and dirty pool of OkHttpClient instances. Each has its own independent dispatcher and diff --git a/okhttp/src/main/java/okhttp3/ConnectionPool.kt b/okhttp/src/main/java/okhttp3/ConnectionPool.kt index 3bdc49139..4dbcb9514 100644 --- a/okhttp/src/main/java/okhttp3/ConnectionPool.kt +++ b/okhttp/src/main/java/okhttp3/ConnectionPool.kt @@ -16,6 +16,7 @@ */ package okhttp3 +import okhttp3.internal.concurrent.TaskRunner import okhttp3.internal.connection.RealConnectionPool import java.util.concurrent.TimeUnit @@ -29,12 +30,19 @@ import java.util.concurrent.TimeUnit * Currently this pool holds up to 5 idle connections which will be evicted after 5 minutes of * inactivity. */ -class ConnectionPool( - maxIdleConnections: Int, - keepAliveDuration: Long, - timeUnit: TimeUnit +class ConnectionPool internal constructor( + internal val delegate: RealConnectionPool ) { - internal val delegate = RealConnectionPool(maxIdleConnections, keepAliveDuration, timeUnit) + constructor( + maxIdleConnections: Int, + keepAliveDuration: Long, + timeUnit: TimeUnit + ) : this(RealConnectionPool( + taskRunner = TaskRunner.INSTANCE, + maxIdleConnections = maxIdleConnections, + keepAliveDuration = keepAliveDuration, + timeUnit = timeUnit + )) constructor() : this(5, 5, TimeUnit.MINUTES) diff --git a/okhttp/src/main/java/okhttp3/internal/concurrent/Task.kt b/okhttp/src/main/java/okhttp3/internal/concurrent/Task.kt index 05fa3fd16..64c836b73 100644 --- a/okhttp/src/main/java/okhttp3/internal/concurrent/Task.kt +++ b/okhttp/src/main/java/okhttp3/internal/concurrent/Task.kt @@ -42,8 +42,7 @@ package okhttp3.internal.concurrent * within it never execute concurrently. It is an error to use a task in multiple queues. */ abstract class Task( - val name: String, - val daemon: Boolean = true + val name: String ) { // Guarded by the TaskRunner. internal var queue: TaskQueue? = null diff --git a/okhttp/src/main/java/okhttp3/internal/concurrent/TaskQueue.kt b/okhttp/src/main/java/okhttp3/internal/concurrent/TaskQueue.kt index 783818006..b0ce4dda6 100644 --- a/okhttp/src/main/java/okhttp3/internal/concurrent/TaskQueue.kt +++ b/okhttp/src/main/java/okhttp3/internal/concurrent/TaskQueue.kt @@ -62,7 +62,7 @@ class TaskQueue internal constructor( * is running when that time is reached, that task is allowed to complete before this task is * started. Similarly the task will be delayed if the host lacks compute resources. */ - fun schedule(task: Task, delayNanos: Long) { + fun schedule(task: Task, delayNanos: Long = 0L) { task.initQueue(this) synchronized(taskRunner) { diff --git a/okhttp/src/main/java/okhttp3/internal/concurrent/TaskRunner.kt b/okhttp/src/main/java/okhttp3/internal/concurrent/TaskRunner.kt index 3807d5d8a..cbc202d08 100644 --- a/okhttp/src/main/java/okhttp3/internal/concurrent/TaskRunner.kt +++ b/okhttp/src/main/java/okhttp3/internal/concurrent/TaskRunner.kt @@ -19,6 +19,7 @@ import okhttp3.internal.addIfAbsent import okhttp3.internal.notify import okhttp3.internal.objectWaitNanos import okhttp3.internal.threadFactory +import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.SynchronousQueue import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit @@ -126,13 +127,13 @@ class TaskRunner( fun coordinatorWait(taskRunner: TaskRunner, nanos: Long) } - class RealBackend : Backend { + internal class RealBackend : Backend { private val coordinatorExecutor = ThreadPoolExecutor( 0, // corePoolSize. 1, // maximumPoolSize. 60L, TimeUnit.SECONDS, // keepAliveTime. - SynchronousQueue(), - threadFactory("OkHttp Task Coordinator", false) + LinkedBlockingQueue(), + threadFactory("OkHttp Task Coordinator", true) ) private val taskExecutor = ThreadPoolExecutor( @@ -161,9 +162,13 @@ class TaskRunner( taskRunner.objectWaitNanos(nanos) } - fun shutDown() { + fun shutdown() { coordinatorExecutor.shutdown() taskExecutor.shutdown() } } + + companion object { + val INSTANCE = TaskRunner(RealBackend()) + } } diff --git a/okhttp/src/main/java/okhttp3/internal/connection/RealConnectionPool.kt b/okhttp/src/main/java/okhttp3/internal/connection/RealConnectionPool.kt index 8dc3c57bc..0bd1c7fab 100644 --- a/okhttp/src/main/java/okhttp3/internal/connection/RealConnectionPool.kt +++ b/okhttp/src/main/java/okhttp3/internal/connection/RealConnectionPool.kt @@ -20,19 +20,18 @@ import okhttp3.Address import okhttp3.ConnectionPool import okhttp3.Route import okhttp3.internal.closeQuietly +import okhttp3.internal.concurrent.Task +import okhttp3.internal.concurrent.TaskQueue +import okhttp3.internal.concurrent.TaskRunner import okhttp3.internal.connection.Transmitter.TransmitterReference -import okhttp3.internal.lockAndWaitNanos -import okhttp3.internal.notifyAll import okhttp3.internal.platform.Platform -import okhttp3.internal.threadFactory import java.io.IOException import java.net.Proxy import java.util.ArrayDeque -import java.util.concurrent.SynchronousQueue -import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit class RealConnectionPool( + taskRunner: TaskRunner, /** The maximum number of idle connections for each address. */ private val maxIdleConnections: Int, keepAliveDuration: Long, @@ -40,24 +39,14 @@ class RealConnectionPool( ) { private val keepAliveDurationNs: Long = timeUnit.toNanos(keepAliveDuration) - private val cleanupRunnable = object : Runnable { - override fun run() { - while (true) { - val waitNanos = cleanup(System.nanoTime()) - if (waitNanos == -1L) return - try { - this@RealConnectionPool.lockAndWaitNanos(waitNanos) - } catch (ie: InterruptedException) { - // Will cause the thread to exit unless other connections are created! - evictAll() - } - } - } + private val cleanupQueue: TaskQueue = taskRunner.newQueue(this) + private val cleanupTask = object : Task("OkHttp ConnectionPool") { + override fun runOnce() = cleanup(System.nanoTime()) + override fun tryCancel() = true } private val connections = ArrayDeque() val routeDatabase = RouteDatabase() - var cleanupRunning: Boolean = false init { // Put a floor on the keep alive duration, otherwise cleanup will spin loop. @@ -98,11 +87,8 @@ class RealConnectionPool( fun put(connection: RealConnection) { assert(Thread.holdsLock(this)) - if (!cleanupRunning) { - cleanupRunning = true - executor.execute(cleanupRunnable) - } connections.add(connection) + cleanupQueue.schedule(cleanupTask) } /** @@ -113,10 +99,10 @@ class RealConnectionPool( assert(Thread.holdsLock(this)) return if (connection.noNewExchanges || maxIdleConnections == 0) { connections.remove(connection) + if (connections.isEmpty()) cleanupQueue.cancelAll() true } else { - // Awake the cleanup thread: we may have exceeded the idle connection limit. - this.notifyAll() + cleanupQueue.schedule(cleanupTask) false } } @@ -133,6 +119,7 @@ class RealConnectionPool( i.remove() } } + if (connections.isEmpty()) cleanupQueue.cancelAll() } for (connection in evictedConnections) { @@ -144,8 +131,8 @@ class RealConnectionPool( * Performs maintenance on this pool, evicting the connection that has been idle the longest if * either it has exceeded the keep alive limit or the idle connections limit. * - * Returns the duration in nanos to sleep until the next scheduled call to this method. Returns - * -1 if no further cleanups are required. + * Returns the duration in nanoseconds to sleep until the next scheduled call to this method. + * Returns -1 if no further cleanups are required. */ fun cleanup(now: Long): Long { var inUseConnectionCount = 0 @@ -178,6 +165,7 @@ class RealConnectionPool( // We've found a connection to evict. Remove it from the list, then close it below // (outside of the synchronized block). connections.remove(longestIdleConnection) + if (connections.isEmpty()) cleanupQueue.cancelAll() } idleConnectionCount > 0 -> { // A connection will be ready to evict soon. @@ -190,7 +178,6 @@ class RealConnectionPool( } else -> { // No connections, idle or in use. - cleanupRunning = false return -1 } } @@ -199,7 +186,7 @@ class RealConnectionPool( longestIdleConnection!!.socket().closeQuietly() // Cleanup again immediately. - return 0 + return 0L } /** @@ -250,19 +237,6 @@ class RealConnectionPool( } companion object { - /** - * Background threads are used to cleanup expired connections. There will be at most a single - * thread running per connection pool. The thread pool executor permits the pool itself to be - * garbage collected. - */ - private val executor = ThreadPoolExecutor( - 0, // corePoolSize. - Int.MAX_VALUE, // maximumPoolSize. - 60L, TimeUnit.SECONDS, // keepAliveTime. - SynchronousQueue(), - threadFactory("OkHttp ConnectionPool", true) - ) - fun get(connectionPool: ConnectionPool): RealConnectionPool = connectionPool.delegate } } diff --git a/okhttp/src/test/java/okhttp3/internal/concurrent/TaskRunnerRealBackendTest.kt b/okhttp/src/test/java/okhttp3/internal/concurrent/TaskRunnerRealBackendTest.kt index 2ac7b13c3..e8d24f932 100644 --- a/okhttp/src/test/java/okhttp3/internal/concurrent/TaskRunnerRealBackendTest.kt +++ b/okhttp/src/test/java/okhttp3/internal/concurrent/TaskRunnerRealBackendTest.kt @@ -37,7 +37,7 @@ class TaskRunnerRealBackendTest { @Test fun test() { val t1 = System.nanoTime() / 1e6 - queue.schedule(object : Task("task", false) { + queue.schedule(object : Task("task") { val delays = mutableListOf(TimeUnit.MILLISECONDS.toNanos(1000), -1L) override fun runOnce(): Long { log.put("runOnce delays.size=${delays.size}") @@ -53,6 +53,6 @@ class TaskRunnerRealBackendTest { val t3 = System.nanoTime() / 1e6 - t1 assertThat(t3).isCloseTo(1750.0, Offset.offset(250.0)) - backend.shutDown() + backend.shutdown() } } diff --git a/okhttp/src/test/java/okhttp3/internal/concurrent/TaskRunnerTest.kt b/okhttp/src/test/java/okhttp3/internal/concurrent/TaskRunnerTest.kt index 346f2d376..ce9d9659e 100644 --- a/okhttp/src/test/java/okhttp3/internal/concurrent/TaskRunnerTest.kt +++ b/okhttp/src/test/java/okhttp3/internal/concurrent/TaskRunnerTest.kt @@ -36,32 +36,28 @@ class TaskRunnerTest { private val blueQueue = taskRunner.newQueue("blue") private val greenQueue = taskRunner.newQueue("green") - init { - backend.taskRunner = taskRunner - } - @Test fun executeDelayed() { - redQueue.schedule(object : Task("task", false) { + redQueue.schedule(object : Task("task") { override fun runOnce(): Long { log += "run@${backend.nanoTime()}" return -1L } }, 100L) - backend.advanceUntil(0L) + backend.advanceUntil(taskRunner, 0L) assertThat(log).containsExactly() - backend.advanceUntil(99L) + backend.advanceUntil(taskRunner, 99L) assertThat(log).containsExactly() - backend.advanceUntil(100L) + backend.advanceUntil(taskRunner, 100L) assertThat(log).containsExactly("run@100") backend.assertNoMoreTasks() } @Test fun executeRepeated() { - redQueue.schedule(object : Task("task", false) { + redQueue.schedule(object : Task("task") { val delays = mutableListOf(50L, 150L, -1L) override fun runOnce(): Long { log += "run@${backend.nanoTime()}" @@ -69,19 +65,19 @@ class TaskRunnerTest { } }, 100L) - backend.advanceUntil(0L) + backend.advanceUntil(taskRunner, 0L) assertThat(log).containsExactly() - backend.advanceUntil(100L) + backend.advanceUntil(taskRunner, 100L) assertThat(log).containsExactly("run@100") - backend.advanceUntil(150L) + backend.advanceUntil(taskRunner, 150L) assertThat(log).containsExactly("run@100", "run@150") - backend.advanceUntil(299L) + backend.advanceUntil(taskRunner, 299L) assertThat(log).containsExactly("run@100", "run@150") - backend.advanceUntil(300L) + backend.advanceUntil(taskRunner, 300L) assertThat(log).containsExactly("run@100", "run@150", "run@300") backend.assertNoMoreTasks() @@ -89,7 +85,7 @@ class TaskRunnerTest { /** Repeat with a delay of 200 but schedule with a delay of 50. The schedule wins. */ @Test fun executeScheduledEarlierReplacesRepeatedLater() { - redQueue.schedule(object : Task("task", false) { + redQueue.schedule(object : Task("task") { val schedules = mutableListOf(50L) val delays = mutableListOf(200L, -1L) override fun runOnce(): Long { @@ -101,13 +97,13 @@ class TaskRunnerTest { } }, 100L) - backend.advanceUntil(0L) + backend.advanceUntil(taskRunner, 0L) assertThat(log).isEmpty() - backend.advanceUntil(100L) + backend.advanceUntil(taskRunner, 100L) assertThat(log).containsExactly("run@100") - backend.advanceUntil(150L) + backend.advanceUntil(taskRunner, 150L) assertThat(log).containsExactly("run@100", "run@150") backend.assertNoMoreTasks() @@ -115,7 +111,7 @@ class TaskRunnerTest { /** Schedule with a delay of 200 but repeat with a delay of 50. The repeat wins. */ @Test fun executeRepeatedEarlierReplacesScheduledLater() { - redQueue.schedule(object : Task("task", false) { + redQueue.schedule(object : Task("task") { val schedules = mutableListOf(200L) val delays = mutableListOf(50L, -1L) override fun runOnce(): Long { @@ -127,20 +123,20 @@ class TaskRunnerTest { } }, 100L) - backend.advanceUntil(0L) + backend.advanceUntil(taskRunner, 0L) assertThat(log).isEmpty() - backend.advanceUntil(100L) + backend.advanceUntil(taskRunner, 100L) assertThat(log).containsExactly("run@100") - backend.advanceUntil(150L) + backend.advanceUntil(taskRunner, 150L) assertThat(log).containsExactly("run@100", "run@150") backend.assertNoMoreTasks() } @Test fun cancelReturnsTruePreventsNextExecution() { - redQueue.schedule(object : Task("task", false) { + redQueue.schedule(object : Task("task") { override fun runOnce(): Long { log += "run@${backend.nanoTime()}" return -1L @@ -152,19 +148,19 @@ class TaskRunnerTest { } }, 100L) - backend.advanceUntil(0L) + backend.advanceUntil(taskRunner, 0L) assertThat(log).isEmpty() redQueue.cancelAll() - backend.advanceUntil(99L) + backend.advanceUntil(taskRunner, 99L) assertThat(log).containsExactly("cancel@99") backend.assertNoMoreTasks() } @Test fun cancelReturnsFalseDoesNotCancel() { - redQueue.schedule(object : Task("task", false) { + redQueue.schedule(object : Task("task") { override fun runOnce(): Long { log += "run@${backend.nanoTime()}" return -1L @@ -176,22 +172,22 @@ class TaskRunnerTest { } }, 100L) - backend.advanceUntil(0L) + backend.advanceUntil(taskRunner, 0L) assertThat(log).isEmpty() redQueue.cancelAll() - backend.advanceUntil(99L) + backend.advanceUntil(taskRunner, 99L) assertThat(log).containsExactly("cancel@99") - backend.advanceUntil(100L) + backend.advanceUntil(taskRunner, 100L) assertThat(log).containsExactly("cancel@99", "run@100") backend.assertNoMoreTasks() } @Test fun cancelWhileExecutingPreventsRepeat() { - redQueue.schedule(object : Task("task", false) { + redQueue.schedule(object : Task("task") { override fun runOnce(): Long { log += "run@${backend.nanoTime()}" redQueue.cancelAll() @@ -204,17 +200,17 @@ class TaskRunnerTest { } }, 100L) - backend.advanceUntil(0L) + backend.advanceUntil(taskRunner, 0L) assertThat(log).isEmpty() - backend.advanceUntil(100L) + backend.advanceUntil(taskRunner, 100L) assertThat(log).containsExactly("run@100", "cancel@100") backend.assertNoMoreTasks() } @Test fun cancelWhileExecutingDoesNothingIfTaskDoesNotRepeat() { - redQueue.schedule(object : Task("task", false) { + redQueue.schedule(object : Task("task") { override fun runOnce(): Long { log += "run@${backend.nanoTime()}" redQueue.cancelAll() @@ -227,17 +223,17 @@ class TaskRunnerTest { } }, 100L) - backend.advanceUntil(0L) + backend.advanceUntil(taskRunner, 0L) assertThat(log).isEmpty() - backend.advanceUntil(100L) + backend.advanceUntil(taskRunner, 100L) assertThat(log).containsExactly("run@100") backend.assertNoMoreTasks() } @Test fun interruptingCoordinatorAttemptsToCancelsAndSucceeds() { - redQueue.schedule(object : Task("task", false) { + redQueue.schedule(object : Task("task") { override fun runOnce(): Long { log += "run@${backend.nanoTime()}" return -1L @@ -249,19 +245,19 @@ class TaskRunnerTest { } }, 100L) - backend.advanceUntil(0L) + backend.advanceUntil(taskRunner, 0L) assertThat(log).isEmpty() - backend.interruptCoordinatorThread() + backend.interruptCoordinatorThread(taskRunner) - backend.advanceUntil(0L) + backend.advanceUntil(taskRunner, 0L) assertThat(log).containsExactly("cancel@0") backend.assertNoMoreTasks() } @Test fun interruptingCoordinatorAttemptsToCancelsAndFails() { - redQueue.schedule(object : Task("task", false) { + redQueue.schedule(object : Task("task") { override fun runOnce(): Long { log += "run@${backend.nanoTime()}" return -1L @@ -273,15 +269,15 @@ class TaskRunnerTest { } }, 100L) - backend.advanceUntil(0L) + backend.advanceUntil(taskRunner, 0L) assertThat(log).isEmpty() - backend.interruptCoordinatorThread() + backend.interruptCoordinatorThread(taskRunner) - backend.advanceUntil(0L) + backend.advanceUntil(taskRunner, 0L) assertThat(log).containsExactly("cancel@0") - backend.advanceUntil(100L) + backend.advanceUntil(taskRunner, 100L) assertThat(log).containsExactly("cancel@0", "run@100") backend.assertNoMoreTasks() @@ -289,31 +285,31 @@ class TaskRunnerTest { /** Inspect how many runnables have been enqueued. If none then we're truly sequential. */ @Test fun singleQueueIsSerial() { - redQueue.schedule(object : Task("task one", false) { + redQueue.schedule(object : Task("task one") { override fun runOnce(): Long { log += "one:run@${backend.nanoTime()} tasksSize=${backend.tasksSize}" return -1L } }, 100L) - redQueue.schedule(object : Task("task two", false) { + redQueue.schedule(object : Task("task two") { override fun runOnce(): Long { log += "two:run@${backend.nanoTime()} tasksSize=${backend.tasksSize}" return -1L } }, 100L) - redQueue.schedule(object : Task("task three", false) { + redQueue.schedule(object : Task("task three") { override fun runOnce(): Long { log += "three:run@${backend.nanoTime()} tasksSize=${backend.tasksSize}" return -1L } }, 100L) - backend.advanceUntil(0L) + backend.advanceUntil(taskRunner, 0L) assertThat(log).isEmpty() - backend.advanceUntil(100L) + backend.advanceUntil(taskRunner, 100L) assertThat(log).containsExactly( "one:run@100 tasksSize=0", "two:run@100 tasksSize=0", @@ -325,31 +321,31 @@ class TaskRunnerTest { /** Inspect how many runnables have been enqueued. If non-zero then we're truly parallel. */ @Test fun differentQueuesAreParallel() { - redQueue.schedule(object : Task("task one", false) { + redQueue.schedule(object : Task("task one") { override fun runOnce(): Long { log += "one:run@${backend.nanoTime()} tasksSize=${backend.tasksSize}" return -1L } }, 100L) - blueQueue.schedule(object : Task("task two", false) { + blueQueue.schedule(object : Task("task two") { override fun runOnce(): Long { log += "two:run@${backend.nanoTime()} tasksSize=${backend.tasksSize}" return -1L } }, 100L) - greenQueue.schedule(object : Task("task three", false) { + greenQueue.schedule(object : Task("task three") { override fun runOnce(): Long { log += "three:run@${backend.nanoTime()} tasksSize=${backend.tasksSize}" return -1L } }, 100L) - backend.advanceUntil(0L) + backend.advanceUntil(taskRunner, 0L) assertThat(log).isEmpty() - backend.advanceUntil(100L) + backend.advanceUntil(taskRunner, 100L) assertThat(log).containsExactly( "one:run@100 tasksSize=2", "two:run@100 tasksSize=1", @@ -361,13 +357,13 @@ class TaskRunnerTest { /** Test the introspection method [TaskQueue.scheduledTasks]. */ @Test fun scheduledTasks() { - redQueue.schedule(object : Task("task one", false) { + redQueue.schedule(object : Task("task one") { override fun runOnce(): Long = -1L override fun toString() = "one" }, 100L) - redQueue.schedule(object : Task("task two", false) { + redQueue.schedule(object : Task("task two") { override fun runOnce(): Long = -1L override fun toString() = "two" @@ -381,7 +377,7 @@ class TaskRunnerTest { * cumbersome to implement properly because the active task might be a cancel. */ @Test fun scheduledTasksDoesNotIncludeRunningTask() { - redQueue.schedule(object : Task("task one", false) { + redQueue.schedule(object : Task("task one") { val schedules = mutableListOf(200L) override fun runOnce(): Long { if (schedules.isNotEmpty()) { @@ -394,7 +390,7 @@ class TaskRunnerTest { override fun toString() = "one" }, 100L) - redQueue.schedule(object : Task("task two", false) { + redQueue.schedule(object : Task("task two") { override fun runOnce(): Long { log += "scheduledTasks=${redQueue.scheduledTasks}" return -1L @@ -403,18 +399,18 @@ class TaskRunnerTest { override fun toString() = "two" }, 200L) - backend.advanceUntil(100L) + backend.advanceUntil(taskRunner, 100L) assertThat(log).containsExactly( "scheduledTasks=[two, one]" ) - backend.advanceUntil(200L) + backend.advanceUntil(taskRunner, 200L) assertThat(log).containsExactly( "scheduledTasks=[two, one]", "scheduledTasks=[one]" ) - backend.advanceUntil(300L) + backend.advanceUntil(taskRunner, 300L) assertThat(log).containsExactly( "scheduledTasks=[two, one]", "scheduledTasks=[one]", @@ -430,42 +426,42 @@ class TaskRunnerTest { * queues have work scheduled. */ @Test fun activeQueuesContainsOnlyQueuesWithScheduledTasks() { - redQueue.schedule(object : Task("task one", false) { + redQueue.schedule(object : Task("task one") { override fun runOnce() = -1L }, 100L) - blueQueue.schedule(object : Task("task two", false) { + blueQueue.schedule(object : Task("task two") { override fun runOnce() = -1L }, 200L) - backend.advanceUntil(0L) + backend.advanceUntil(taskRunner, 0L) assertThat(taskRunner.activeQueues()).containsExactly(redQueue, blueQueue) - backend.advanceUntil(100L) + backend.advanceUntil(taskRunner, 100L) assertThat(taskRunner.activeQueues()).containsExactly(blueQueue) - backend.advanceUntil(200L) + backend.advanceUntil(taskRunner, 200L) assertThat(taskRunner.activeQueues()).isEmpty() backend.assertNoMoreTasks() } @Test fun taskNameIsUsedForThreadNameWhenRunning() { - redQueue.schedule(object : Task("lucky task", false) { + redQueue.schedule(object : Task("lucky task") { override fun runOnce(): Long { log += "run threadName:${Thread.currentThread().name}" return -1L } - }, 0L) + }) - backend.advanceUntil(0L) + backend.advanceUntil(taskRunner, 0L) assertThat(log).containsExactly("run threadName:lucky task") backend.assertNoMoreTasks() } @Test fun taskNameIsUsedForThreadNameWhenCanceling() { - redQueue.schedule(object : Task("lucky task", false) { + redQueue.schedule(object : Task("lucky task") { override fun tryCancel(): Boolean { log += "cancel threadName:${Thread.currentThread().name}" return true @@ -474,12 +470,12 @@ class TaskRunnerTest { override fun runOnce() = -1L }, 100L) - backend.advanceUntil(0L) + backend.advanceUntil(taskRunner, 0L) assertThat(log).isEmpty() redQueue.cancelAll() - backend.advanceUntil(0L) + backend.advanceUntil(taskRunner, 0L) assertThat(log).containsExactly("cancel threadName:lucky task") backend.assertNoMoreTasks() @@ -498,9 +494,6 @@ class TaskRunnerTest { /** How many tasks can be executed immediately. */ val tasksSize: Int get() = tasks.size - /** The task runner to lock on. */ - lateinit var taskRunner: TaskRunner - /** Guarded by taskRunner. */ private var nanoTime = 0L @@ -513,12 +506,10 @@ class TaskRunnerTest { } override fun executeTask(runnable: Runnable) { - check(Thread.holdsLock(taskRunner)) tasks += runnable } override fun nanoTime(): Long { - check(Thread.holdsLock(taskRunner)) return nanoTime } @@ -540,14 +531,14 @@ class TaskRunnerTest { } /** Advance the simulated clock and run anything ready at the new time. */ - fun advanceUntil(newTime: Long) { + fun advanceUntil(taskRunner: TaskRunner, newTime: Long) { check(!Thread.holdsLock(taskRunner)) synchronized(taskRunner) { nanoTime = newTime while (true) { - runRunnables() + runRunnables(taskRunner) if (coordinatorWaitingUntilTime <= nanoTime) { // Let the coordinator do its business at the new time. @@ -561,7 +552,7 @@ class TaskRunnerTest { } /** Returns true if anything was executed. */ - private fun runRunnables() { + private fun runRunnables(taskRunner: TaskRunner) { check(Thread.holdsLock(taskRunner)) if (coordinatorToRun != null) { @@ -593,7 +584,7 @@ class TaskRunnerTest { assertThat(coordinatorWaitingUntilTime).isEqualTo(Long.MAX_VALUE) } - fun interruptCoordinatorThread() { + fun interruptCoordinatorThread(taskRunner: TaskRunner) { check(!Thread.holdsLock(taskRunner)) synchronized(taskRunner) { diff --git a/okhttp/src/test/java/okhttp3/internal/connection/ConnectionPoolTest.java b/okhttp/src/test/java/okhttp3/internal/connection/ConnectionPoolTest.java index e9908f89f..7f64cbea2 100644 --- a/okhttp/src/test/java/okhttp3/internal/connection/ConnectionPoolTest.java +++ b/okhttp/src/test/java/okhttp3/internal/connection/ConnectionPoolTest.java @@ -30,12 +30,16 @@ import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Route; import okhttp3.internal.RecordingOkAuthenticator; +import okhttp3.internal.concurrent.TaskRunner; +import okhttp3.internal.concurrent.TaskRunnerTest; import org.junit.Test; import static okhttp3.TestUtil.awaitGarbageCollection; import static org.assertj.core.api.Assertions.assertThat; public final class ConnectionPoolTest { + /** The fake task runner prevents the cleanup runnable from being started. */ + private final TaskRunner taskRunner = new TaskRunner(new TaskRunnerTest.FakeBackend()); private final Address addressA = newAddress("a"); private final Route routeA1 = newRoute(addressA); private final Address addressB = newAddress("b"); @@ -44,8 +48,8 @@ public final class ConnectionPoolTest { private final Route routeC1 = newRoute(addressC); @Test public void connectionsEvictedWhenIdleLongEnough() throws Exception { - RealConnectionPool pool = new RealConnectionPool(Integer.MAX_VALUE, 100L, TimeUnit.NANOSECONDS); - pool.setCleanupRunning(true); // Prevent the cleanup runnable from being started. + RealConnectionPool pool = new RealConnectionPool( + taskRunner, Integer.MAX_VALUE, 100L, TimeUnit.NANOSECONDS); RealConnection c1 = newConnection(pool, routeA1, 50L); @@ -76,9 +80,9 @@ public final class ConnectionPoolTest { } @Test public void inUseConnectionsNotEvicted() throws Exception { - ConnectionPool poolApi = new ConnectionPool(Integer.MAX_VALUE, 100L, TimeUnit.NANOSECONDS); - RealConnectionPool pool = RealConnectionPool.Companion.get(poolApi); - pool.setCleanupRunning(true); // Prevent the cleanup runnable from being started. + RealConnectionPool pool = new RealConnectionPool( + taskRunner, Integer.MAX_VALUE, 100L, TimeUnit.NANOSECONDS); + ConnectionPool poolApi = new ConnectionPool(pool); RealConnection c1 = newConnection(pool, routeA1, 50L); synchronized (pool) { @@ -108,8 +112,8 @@ public final class ConnectionPoolTest { } @Test public void cleanupPrioritizesEarliestEviction() throws Exception { - RealConnectionPool pool = new RealConnectionPool(Integer.MAX_VALUE, 100L, TimeUnit.NANOSECONDS); - pool.setCleanupRunning(true); // Prevent the cleanup runnable from being started. + RealConnectionPool pool = new RealConnectionPool( + taskRunner, Integer.MAX_VALUE, 100L, TimeUnit.NANOSECONDS); RealConnection c1 = newConnection(pool, routeA1, 75L); RealConnection c2 = newConnection(pool, routeB1, 50L); @@ -140,8 +144,8 @@ public final class ConnectionPoolTest { } @Test public void oldestConnectionsEvictedIfIdleLimitExceeded() throws Exception { - RealConnectionPool pool = new RealConnectionPool(2, 100L, TimeUnit.NANOSECONDS); - pool.setCleanupRunning(true); // Prevent the cleanup runnable from being started. + RealConnectionPool pool = new RealConnectionPool( + taskRunner, 2, 100L, TimeUnit.NANOSECONDS); RealConnection c1 = newConnection(pool, routeA1, 50L); RealConnection c2 = newConnection(pool, routeB1, 75L); @@ -164,9 +168,9 @@ public final class ConnectionPoolTest { } @Test public void leakedAllocation() throws Exception { - ConnectionPool poolApi = new ConnectionPool(Integer.MAX_VALUE, 100L, TimeUnit.NANOSECONDS); - RealConnectionPool pool = RealConnectionPool.Companion.get(poolApi); - pool.setCleanupRunning(true); // Prevent the cleanup runnable from being started. + RealConnectionPool pool = new RealConnectionPool( + taskRunner, Integer.MAX_VALUE, 100L, TimeUnit.NANOSECONDS); + ConnectionPool poolApi = new ConnectionPool(pool); RealConnection c1 = newConnection(pool, routeA1, 0L); allocateAndLeakAllocation(poolApi, c1); @@ -180,24 +184,26 @@ public final class ConnectionPoolTest { } @Test public void interruptStopsThread() throws Exception { - RealConnectionPool pool = new RealConnectionPool(2, 100L, TimeUnit.NANOSECONDS); + TaskRunner realTaskRunner = TaskRunner.Companion.getINSTANCE(); + RealConnectionPool pool = new RealConnectionPool( + realTaskRunner, 2, 100L, TimeUnit.NANOSECONDS); RealConnection c1 = newConnection(pool, routeA1, Long.MAX_VALUE); - assertThat(pool.getCleanupRunning()).isTrue(); + assertThat(realTaskRunner.activeQueues()).isNotEmpty(); Thread.sleep(100); Thread[] threads = new Thread[Thread.activeCount() * 2]; Thread.enumerate(threads); for (Thread t: threads) { - if (t != null && t.getName().equals("OkHttp ConnectionPool")) { + if (t != null && t.getName().equals("OkHttp Task Coordinator")) { t.interrupt(); } } Thread.sleep(100); - assertThat(pool.getCleanupRunning()).isFalse(); + assertThat(realTaskRunner.activeQueues()).isEmpty(); } /** Use a helper method so there's no hidden reference remaining on the stack. */