1
0
mirror of https://github.com/square/okhttp.git synced 2025-08-07 12:42:57 +03:00

Adopt TaskRunner in RealConnectionPool

This also configures tests to assert that the connection pool
isn't doing any work after the test completes.
This commit is contained in:
Jesse Wilson
2019-09-23 22:36:05 -04:00
parent 5b09969d61
commit bc3ad111ad
9 changed files with 163 additions and 153 deletions

View File

@@ -15,6 +15,9 @@
*/ */
package okhttp3 package okhttp3
import okhttp3.internal.concurrent.Task
import okhttp3.internal.concurrent.TaskQueue
import okhttp3.internal.concurrent.TaskRunner
import okhttp3.testing.Flaky import okhttp3.testing.Flaky
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.junit.rules.TestRule import org.junit.rules.TestRule
@@ -22,6 +25,8 @@ import org.junit.runner.Description
import org.junit.runners.model.Statement import org.junit.runners.model.Statement
import java.net.InetAddress import java.net.InetAddress
import java.util.concurrent.ConcurrentLinkedDeque import java.util.concurrent.ConcurrentLinkedDeque
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
/** Apply this rule to tests that need an OkHttpClient instance. */ /** Apply this rule to tests that need an OkHttpClient instance. */
class OkHttpClientTestRule : TestRule { class OkHttpClientTestRule : TestRule {
@@ -59,6 +64,14 @@ class OkHttpClientTestRule : TestRule {
} }
} }
private fun ensureAllTaskQueuesIdle() {
for (queue in TaskRunner.INSTANCE.activeQueues()) {
assertThat(queue.awaitIdle(500L, TimeUnit.MILLISECONDS))
.withFailMessage("Queue ${queue.owner} still active after 500ms")
.isTrue()
}
}
override fun apply(base: Statement, description: Description): Statement { override fun apply(base: Statement, description: Description): Statement {
return object : Statement() { return object : Statement() {
override fun evaluate() { override fun evaluate() {
@@ -72,6 +85,7 @@ class OkHttpClientTestRule : TestRule {
} finally { } finally {
ensureAllConnectionsReleased() ensureAllConnectionsReleased()
releaseClient() releaseClient()
ensureAllTaskQueuesIdle()
} }
} }
@@ -119,6 +133,19 @@ class OkHttpClientTestRule : TestRule {
} }
} }
/** Returns true if this queue became idle before the timeout elapsed. */
private fun TaskQueue.awaitIdle(timeout: Long, timeUnit: TimeUnit): Boolean {
val latch = CountDownLatch(1)
schedule(object : Task("awaitIdle") {
override fun runOnce(): Long {
latch.countDown()
return -1L
}
})
return latch.await(timeout, timeUnit)
}
companion object { companion object {
/** /**
* Quick and dirty pool of OkHttpClient instances. Each has its own independent dispatcher and * Quick and dirty pool of OkHttpClient instances. Each has its own independent dispatcher and

View File

@@ -16,6 +16,7 @@
*/ */
package okhttp3 package okhttp3
import okhttp3.internal.concurrent.TaskRunner
import okhttp3.internal.connection.RealConnectionPool import okhttp3.internal.connection.RealConnectionPool
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
@@ -29,12 +30,19 @@ import java.util.concurrent.TimeUnit
* Currently this pool holds up to 5 idle connections which will be evicted after 5 minutes of * Currently this pool holds up to 5 idle connections which will be evicted after 5 minutes of
* inactivity. * inactivity.
*/ */
class ConnectionPool( class ConnectionPool internal constructor(
maxIdleConnections: Int, internal val delegate: RealConnectionPool
keepAliveDuration: Long,
timeUnit: TimeUnit
) { ) {
internal val delegate = RealConnectionPool(maxIdleConnections, keepAliveDuration, timeUnit) constructor(
maxIdleConnections: Int,
keepAliveDuration: Long,
timeUnit: TimeUnit
) : this(RealConnectionPool(
taskRunner = TaskRunner.INSTANCE,
maxIdleConnections = maxIdleConnections,
keepAliveDuration = keepAliveDuration,
timeUnit = timeUnit
))
constructor() : this(5, 5, TimeUnit.MINUTES) constructor() : this(5, 5, TimeUnit.MINUTES)

View File

@@ -42,8 +42,7 @@ package okhttp3.internal.concurrent
* within it never execute concurrently. It is an error to use a task in multiple queues. * within it never execute concurrently. It is an error to use a task in multiple queues.
*/ */
abstract class Task( abstract class Task(
val name: String, val name: String
val daemon: Boolean = true
) { ) {
// Guarded by the TaskRunner. // Guarded by the TaskRunner.
internal var queue: TaskQueue? = null internal var queue: TaskQueue? = null

View File

@@ -62,7 +62,7 @@ class TaskQueue internal constructor(
* is running when that time is reached, that task is allowed to complete before this task is * is running when that time is reached, that task is allowed to complete before this task is
* started. Similarly the task will be delayed if the host lacks compute resources. * started. Similarly the task will be delayed if the host lacks compute resources.
*/ */
fun schedule(task: Task, delayNanos: Long) { fun schedule(task: Task, delayNanos: Long = 0L) {
task.initQueue(this) task.initQueue(this)
synchronized(taskRunner) { synchronized(taskRunner) {

View File

@@ -19,6 +19,7 @@ import okhttp3.internal.addIfAbsent
import okhttp3.internal.notify import okhttp3.internal.notify
import okhttp3.internal.objectWaitNanos import okhttp3.internal.objectWaitNanos
import okhttp3.internal.threadFactory import okhttp3.internal.threadFactory
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.SynchronousQueue import java.util.concurrent.SynchronousQueue
import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
@@ -126,13 +127,13 @@ class TaskRunner(
fun coordinatorWait(taskRunner: TaskRunner, nanos: Long) fun coordinatorWait(taskRunner: TaskRunner, nanos: Long)
} }
class RealBackend : Backend { internal class RealBackend : Backend {
private val coordinatorExecutor = ThreadPoolExecutor( private val coordinatorExecutor = ThreadPoolExecutor(
0, // corePoolSize. 0, // corePoolSize.
1, // maximumPoolSize. 1, // maximumPoolSize.
60L, TimeUnit.SECONDS, // keepAliveTime. 60L, TimeUnit.SECONDS, // keepAliveTime.
SynchronousQueue(), LinkedBlockingQueue<Runnable>(),
threadFactory("OkHttp Task Coordinator", false) threadFactory("OkHttp Task Coordinator", true)
) )
private val taskExecutor = ThreadPoolExecutor( private val taskExecutor = ThreadPoolExecutor(
@@ -161,9 +162,13 @@ class TaskRunner(
taskRunner.objectWaitNanos(nanos) taskRunner.objectWaitNanos(nanos)
} }
fun shutDown() { fun shutdown() {
coordinatorExecutor.shutdown() coordinatorExecutor.shutdown()
taskExecutor.shutdown() taskExecutor.shutdown()
} }
} }
companion object {
val INSTANCE = TaskRunner(RealBackend())
}
} }

View File

@@ -20,19 +20,18 @@ import okhttp3.Address
import okhttp3.ConnectionPool import okhttp3.ConnectionPool
import okhttp3.Route import okhttp3.Route
import okhttp3.internal.closeQuietly import okhttp3.internal.closeQuietly
import okhttp3.internal.concurrent.Task
import okhttp3.internal.concurrent.TaskQueue
import okhttp3.internal.concurrent.TaskRunner
import okhttp3.internal.connection.Transmitter.TransmitterReference import okhttp3.internal.connection.Transmitter.TransmitterReference
import okhttp3.internal.lockAndWaitNanos
import okhttp3.internal.notifyAll
import okhttp3.internal.platform.Platform import okhttp3.internal.platform.Platform
import okhttp3.internal.threadFactory
import java.io.IOException import java.io.IOException
import java.net.Proxy import java.net.Proxy
import java.util.ArrayDeque import java.util.ArrayDeque
import java.util.concurrent.SynchronousQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
class RealConnectionPool( class RealConnectionPool(
taskRunner: TaskRunner,
/** The maximum number of idle connections for each address. */ /** The maximum number of idle connections for each address. */
private val maxIdleConnections: Int, private val maxIdleConnections: Int,
keepAliveDuration: Long, keepAliveDuration: Long,
@@ -40,24 +39,14 @@ class RealConnectionPool(
) { ) {
private val keepAliveDurationNs: Long = timeUnit.toNanos(keepAliveDuration) private val keepAliveDurationNs: Long = timeUnit.toNanos(keepAliveDuration)
private val cleanupRunnable = object : Runnable { private val cleanupQueue: TaskQueue = taskRunner.newQueue(this)
override fun run() { private val cleanupTask = object : Task("OkHttp ConnectionPool") {
while (true) { override fun runOnce() = cleanup(System.nanoTime())
val waitNanos = cleanup(System.nanoTime()) override fun tryCancel() = true
if (waitNanos == -1L) return
try {
this@RealConnectionPool.lockAndWaitNanos(waitNanos)
} catch (ie: InterruptedException) {
// Will cause the thread to exit unless other connections are created!
evictAll()
}
}
}
} }
private val connections = ArrayDeque<RealConnection>() private val connections = ArrayDeque<RealConnection>()
val routeDatabase = RouteDatabase() val routeDatabase = RouteDatabase()
var cleanupRunning: Boolean = false
init { init {
// Put a floor on the keep alive duration, otherwise cleanup will spin loop. // Put a floor on the keep alive duration, otherwise cleanup will spin loop.
@@ -98,11 +87,8 @@ class RealConnectionPool(
fun put(connection: RealConnection) { fun put(connection: RealConnection) {
assert(Thread.holdsLock(this)) assert(Thread.holdsLock(this))
if (!cleanupRunning) {
cleanupRunning = true
executor.execute(cleanupRunnable)
}
connections.add(connection) connections.add(connection)
cleanupQueue.schedule(cleanupTask)
} }
/** /**
@@ -113,10 +99,10 @@ class RealConnectionPool(
assert(Thread.holdsLock(this)) assert(Thread.holdsLock(this))
return if (connection.noNewExchanges || maxIdleConnections == 0) { return if (connection.noNewExchanges || maxIdleConnections == 0) {
connections.remove(connection) connections.remove(connection)
if (connections.isEmpty()) cleanupQueue.cancelAll()
true true
} else { } else {
// Awake the cleanup thread: we may have exceeded the idle connection limit. cleanupQueue.schedule(cleanupTask)
this.notifyAll()
false false
} }
} }
@@ -133,6 +119,7 @@ class RealConnectionPool(
i.remove() i.remove()
} }
} }
if (connections.isEmpty()) cleanupQueue.cancelAll()
} }
for (connection in evictedConnections) { for (connection in evictedConnections) {
@@ -144,8 +131,8 @@ class RealConnectionPool(
* Performs maintenance on this pool, evicting the connection that has been idle the longest if * Performs maintenance on this pool, evicting the connection that has been idle the longest if
* either it has exceeded the keep alive limit or the idle connections limit. * either it has exceeded the keep alive limit or the idle connections limit.
* *
* Returns the duration in nanos to sleep until the next scheduled call to this method. Returns * Returns the duration in nanoseconds to sleep until the next scheduled call to this method.
* -1 if no further cleanups are required. * Returns -1 if no further cleanups are required.
*/ */
fun cleanup(now: Long): Long { fun cleanup(now: Long): Long {
var inUseConnectionCount = 0 var inUseConnectionCount = 0
@@ -178,6 +165,7 @@ class RealConnectionPool(
// We've found a connection to evict. Remove it from the list, then close it below // We've found a connection to evict. Remove it from the list, then close it below
// (outside of the synchronized block). // (outside of the synchronized block).
connections.remove(longestIdleConnection) connections.remove(longestIdleConnection)
if (connections.isEmpty()) cleanupQueue.cancelAll()
} }
idleConnectionCount > 0 -> { idleConnectionCount > 0 -> {
// A connection will be ready to evict soon. // A connection will be ready to evict soon.
@@ -190,7 +178,6 @@ class RealConnectionPool(
} }
else -> { else -> {
// No connections, idle or in use. // No connections, idle or in use.
cleanupRunning = false
return -1 return -1
} }
} }
@@ -199,7 +186,7 @@ class RealConnectionPool(
longestIdleConnection!!.socket().closeQuietly() longestIdleConnection!!.socket().closeQuietly()
// Cleanup again immediately. // Cleanup again immediately.
return 0 return 0L
} }
/** /**
@@ -250,19 +237,6 @@ class RealConnectionPool(
} }
companion object { companion object {
/**
* Background threads are used to cleanup expired connections. There will be at most a single
* thread running per connection pool. The thread pool executor permits the pool itself to be
* garbage collected.
*/
private val executor = ThreadPoolExecutor(
0, // corePoolSize.
Int.MAX_VALUE, // maximumPoolSize.
60L, TimeUnit.SECONDS, // keepAliveTime.
SynchronousQueue(),
threadFactory("OkHttp ConnectionPool", true)
)
fun get(connectionPool: ConnectionPool): RealConnectionPool = connectionPool.delegate fun get(connectionPool: ConnectionPool): RealConnectionPool = connectionPool.delegate
} }
} }

View File

@@ -37,7 +37,7 @@ class TaskRunnerRealBackendTest {
@Test fun test() { @Test fun test() {
val t1 = System.nanoTime() / 1e6 val t1 = System.nanoTime() / 1e6
queue.schedule(object : Task("task", false) { queue.schedule(object : Task("task") {
val delays = mutableListOf(TimeUnit.MILLISECONDS.toNanos(1000), -1L) val delays = mutableListOf(TimeUnit.MILLISECONDS.toNanos(1000), -1L)
override fun runOnce(): Long { override fun runOnce(): Long {
log.put("runOnce delays.size=${delays.size}") log.put("runOnce delays.size=${delays.size}")
@@ -53,6 +53,6 @@ class TaskRunnerRealBackendTest {
val t3 = System.nanoTime() / 1e6 - t1 val t3 = System.nanoTime() / 1e6 - t1
assertThat(t3).isCloseTo(1750.0, Offset.offset(250.0)) assertThat(t3).isCloseTo(1750.0, Offset.offset(250.0))
backend.shutDown() backend.shutdown()
} }
} }

View File

@@ -36,32 +36,28 @@ class TaskRunnerTest {
private val blueQueue = taskRunner.newQueue("blue") private val blueQueue = taskRunner.newQueue("blue")
private val greenQueue = taskRunner.newQueue("green") private val greenQueue = taskRunner.newQueue("green")
init {
backend.taskRunner = taskRunner
}
@Test fun executeDelayed() { @Test fun executeDelayed() {
redQueue.schedule(object : Task("task", false) { redQueue.schedule(object : Task("task") {
override fun runOnce(): Long { override fun runOnce(): Long {
log += "run@${backend.nanoTime()}" log += "run@${backend.nanoTime()}"
return -1L return -1L
} }
}, 100L) }, 100L)
backend.advanceUntil(0L) backend.advanceUntil(taskRunner, 0L)
assertThat(log).containsExactly() assertThat(log).containsExactly()
backend.advanceUntil(99L) backend.advanceUntil(taskRunner, 99L)
assertThat(log).containsExactly() assertThat(log).containsExactly()
backend.advanceUntil(100L) backend.advanceUntil(taskRunner, 100L)
assertThat(log).containsExactly("run@100") assertThat(log).containsExactly("run@100")
backend.assertNoMoreTasks() backend.assertNoMoreTasks()
} }
@Test fun executeRepeated() { @Test fun executeRepeated() {
redQueue.schedule(object : Task("task", false) { redQueue.schedule(object : Task("task") {
val delays = mutableListOf(50L, 150L, -1L) val delays = mutableListOf(50L, 150L, -1L)
override fun runOnce(): Long { override fun runOnce(): Long {
log += "run@${backend.nanoTime()}" log += "run@${backend.nanoTime()}"
@@ -69,19 +65,19 @@ class TaskRunnerTest {
} }
}, 100L) }, 100L)
backend.advanceUntil(0L) backend.advanceUntil(taskRunner, 0L)
assertThat(log).containsExactly() assertThat(log).containsExactly()
backend.advanceUntil(100L) backend.advanceUntil(taskRunner, 100L)
assertThat(log).containsExactly("run@100") assertThat(log).containsExactly("run@100")
backend.advanceUntil(150L) backend.advanceUntil(taskRunner, 150L)
assertThat(log).containsExactly("run@100", "run@150") assertThat(log).containsExactly("run@100", "run@150")
backend.advanceUntil(299L) backend.advanceUntil(taskRunner, 299L)
assertThat(log).containsExactly("run@100", "run@150") assertThat(log).containsExactly("run@100", "run@150")
backend.advanceUntil(300L) backend.advanceUntil(taskRunner, 300L)
assertThat(log).containsExactly("run@100", "run@150", "run@300") assertThat(log).containsExactly("run@100", "run@150", "run@300")
backend.assertNoMoreTasks() backend.assertNoMoreTasks()
@@ -89,7 +85,7 @@ class TaskRunnerTest {
/** Repeat with a delay of 200 but schedule with a delay of 50. The schedule wins. */ /** Repeat with a delay of 200 but schedule with a delay of 50. The schedule wins. */
@Test fun executeScheduledEarlierReplacesRepeatedLater() { @Test fun executeScheduledEarlierReplacesRepeatedLater() {
redQueue.schedule(object : Task("task", false) { redQueue.schedule(object : Task("task") {
val schedules = mutableListOf(50L) val schedules = mutableListOf(50L)
val delays = mutableListOf(200L, -1L) val delays = mutableListOf(200L, -1L)
override fun runOnce(): Long { override fun runOnce(): Long {
@@ -101,13 +97,13 @@ class TaskRunnerTest {
} }
}, 100L) }, 100L)
backend.advanceUntil(0L) backend.advanceUntil(taskRunner, 0L)
assertThat(log).isEmpty() assertThat(log).isEmpty()
backend.advanceUntil(100L) backend.advanceUntil(taskRunner, 100L)
assertThat(log).containsExactly("run@100") assertThat(log).containsExactly("run@100")
backend.advanceUntil(150L) backend.advanceUntil(taskRunner, 150L)
assertThat(log).containsExactly("run@100", "run@150") assertThat(log).containsExactly("run@100", "run@150")
backend.assertNoMoreTasks() backend.assertNoMoreTasks()
@@ -115,7 +111,7 @@ class TaskRunnerTest {
/** Schedule with a delay of 200 but repeat with a delay of 50. The repeat wins. */ /** Schedule with a delay of 200 but repeat with a delay of 50. The repeat wins. */
@Test fun executeRepeatedEarlierReplacesScheduledLater() { @Test fun executeRepeatedEarlierReplacesScheduledLater() {
redQueue.schedule(object : Task("task", false) { redQueue.schedule(object : Task("task") {
val schedules = mutableListOf(200L) val schedules = mutableListOf(200L)
val delays = mutableListOf(50L, -1L) val delays = mutableListOf(50L, -1L)
override fun runOnce(): Long { override fun runOnce(): Long {
@@ -127,20 +123,20 @@ class TaskRunnerTest {
} }
}, 100L) }, 100L)
backend.advanceUntil(0L) backend.advanceUntil(taskRunner, 0L)
assertThat(log).isEmpty() assertThat(log).isEmpty()
backend.advanceUntil(100L) backend.advanceUntil(taskRunner, 100L)
assertThat(log).containsExactly("run@100") assertThat(log).containsExactly("run@100")
backend.advanceUntil(150L) backend.advanceUntil(taskRunner, 150L)
assertThat(log).containsExactly("run@100", "run@150") assertThat(log).containsExactly("run@100", "run@150")
backend.assertNoMoreTasks() backend.assertNoMoreTasks()
} }
@Test fun cancelReturnsTruePreventsNextExecution() { @Test fun cancelReturnsTruePreventsNextExecution() {
redQueue.schedule(object : Task("task", false) { redQueue.schedule(object : Task("task") {
override fun runOnce(): Long { override fun runOnce(): Long {
log += "run@${backend.nanoTime()}" log += "run@${backend.nanoTime()}"
return -1L return -1L
@@ -152,19 +148,19 @@ class TaskRunnerTest {
} }
}, 100L) }, 100L)
backend.advanceUntil(0L) backend.advanceUntil(taskRunner, 0L)
assertThat(log).isEmpty() assertThat(log).isEmpty()
redQueue.cancelAll() redQueue.cancelAll()
backend.advanceUntil(99L) backend.advanceUntil(taskRunner, 99L)
assertThat(log).containsExactly("cancel@99") assertThat(log).containsExactly("cancel@99")
backend.assertNoMoreTasks() backend.assertNoMoreTasks()
} }
@Test fun cancelReturnsFalseDoesNotCancel() { @Test fun cancelReturnsFalseDoesNotCancel() {
redQueue.schedule(object : Task("task", false) { redQueue.schedule(object : Task("task") {
override fun runOnce(): Long { override fun runOnce(): Long {
log += "run@${backend.nanoTime()}" log += "run@${backend.nanoTime()}"
return -1L return -1L
@@ -176,22 +172,22 @@ class TaskRunnerTest {
} }
}, 100L) }, 100L)
backend.advanceUntil(0L) backend.advanceUntil(taskRunner, 0L)
assertThat(log).isEmpty() assertThat(log).isEmpty()
redQueue.cancelAll() redQueue.cancelAll()
backend.advanceUntil(99L) backend.advanceUntil(taskRunner, 99L)
assertThat(log).containsExactly("cancel@99") assertThat(log).containsExactly("cancel@99")
backend.advanceUntil(100L) backend.advanceUntil(taskRunner, 100L)
assertThat(log).containsExactly("cancel@99", "run@100") assertThat(log).containsExactly("cancel@99", "run@100")
backend.assertNoMoreTasks() backend.assertNoMoreTasks()
} }
@Test fun cancelWhileExecutingPreventsRepeat() { @Test fun cancelWhileExecutingPreventsRepeat() {
redQueue.schedule(object : Task("task", false) { redQueue.schedule(object : Task("task") {
override fun runOnce(): Long { override fun runOnce(): Long {
log += "run@${backend.nanoTime()}" log += "run@${backend.nanoTime()}"
redQueue.cancelAll() redQueue.cancelAll()
@@ -204,17 +200,17 @@ class TaskRunnerTest {
} }
}, 100L) }, 100L)
backend.advanceUntil(0L) backend.advanceUntil(taskRunner, 0L)
assertThat(log).isEmpty() assertThat(log).isEmpty()
backend.advanceUntil(100L) backend.advanceUntil(taskRunner, 100L)
assertThat(log).containsExactly("run@100", "cancel@100") assertThat(log).containsExactly("run@100", "cancel@100")
backend.assertNoMoreTasks() backend.assertNoMoreTasks()
} }
@Test fun cancelWhileExecutingDoesNothingIfTaskDoesNotRepeat() { @Test fun cancelWhileExecutingDoesNothingIfTaskDoesNotRepeat() {
redQueue.schedule(object : Task("task", false) { redQueue.schedule(object : Task("task") {
override fun runOnce(): Long { override fun runOnce(): Long {
log += "run@${backend.nanoTime()}" log += "run@${backend.nanoTime()}"
redQueue.cancelAll() redQueue.cancelAll()
@@ -227,17 +223,17 @@ class TaskRunnerTest {
} }
}, 100L) }, 100L)
backend.advanceUntil(0L) backend.advanceUntil(taskRunner, 0L)
assertThat(log).isEmpty() assertThat(log).isEmpty()
backend.advanceUntil(100L) backend.advanceUntil(taskRunner, 100L)
assertThat(log).containsExactly("run@100") assertThat(log).containsExactly("run@100")
backend.assertNoMoreTasks() backend.assertNoMoreTasks()
} }
@Test fun interruptingCoordinatorAttemptsToCancelsAndSucceeds() { @Test fun interruptingCoordinatorAttemptsToCancelsAndSucceeds() {
redQueue.schedule(object : Task("task", false) { redQueue.schedule(object : Task("task") {
override fun runOnce(): Long { override fun runOnce(): Long {
log += "run@${backend.nanoTime()}" log += "run@${backend.nanoTime()}"
return -1L return -1L
@@ -249,19 +245,19 @@ class TaskRunnerTest {
} }
}, 100L) }, 100L)
backend.advanceUntil(0L) backend.advanceUntil(taskRunner, 0L)
assertThat(log).isEmpty() assertThat(log).isEmpty()
backend.interruptCoordinatorThread() backend.interruptCoordinatorThread(taskRunner)
backend.advanceUntil(0L) backend.advanceUntil(taskRunner, 0L)
assertThat(log).containsExactly("cancel@0") assertThat(log).containsExactly("cancel@0")
backend.assertNoMoreTasks() backend.assertNoMoreTasks()
} }
@Test fun interruptingCoordinatorAttemptsToCancelsAndFails() { @Test fun interruptingCoordinatorAttemptsToCancelsAndFails() {
redQueue.schedule(object : Task("task", false) { redQueue.schedule(object : Task("task") {
override fun runOnce(): Long { override fun runOnce(): Long {
log += "run@${backend.nanoTime()}" log += "run@${backend.nanoTime()}"
return -1L return -1L
@@ -273,15 +269,15 @@ class TaskRunnerTest {
} }
}, 100L) }, 100L)
backend.advanceUntil(0L) backend.advanceUntil(taskRunner, 0L)
assertThat(log).isEmpty() assertThat(log).isEmpty()
backend.interruptCoordinatorThread() backend.interruptCoordinatorThread(taskRunner)
backend.advanceUntil(0L) backend.advanceUntil(taskRunner, 0L)
assertThat(log).containsExactly("cancel@0") assertThat(log).containsExactly("cancel@0")
backend.advanceUntil(100L) backend.advanceUntil(taskRunner, 100L)
assertThat(log).containsExactly("cancel@0", "run@100") assertThat(log).containsExactly("cancel@0", "run@100")
backend.assertNoMoreTasks() backend.assertNoMoreTasks()
@@ -289,31 +285,31 @@ class TaskRunnerTest {
/** Inspect how many runnables have been enqueued. If none then we're truly sequential. */ /** Inspect how many runnables have been enqueued. If none then we're truly sequential. */
@Test fun singleQueueIsSerial() { @Test fun singleQueueIsSerial() {
redQueue.schedule(object : Task("task one", false) { redQueue.schedule(object : Task("task one") {
override fun runOnce(): Long { override fun runOnce(): Long {
log += "one:run@${backend.nanoTime()} tasksSize=${backend.tasksSize}" log += "one:run@${backend.nanoTime()} tasksSize=${backend.tasksSize}"
return -1L return -1L
} }
}, 100L) }, 100L)
redQueue.schedule(object : Task("task two", false) { redQueue.schedule(object : Task("task two") {
override fun runOnce(): Long { override fun runOnce(): Long {
log += "two:run@${backend.nanoTime()} tasksSize=${backend.tasksSize}" log += "two:run@${backend.nanoTime()} tasksSize=${backend.tasksSize}"
return -1L return -1L
} }
}, 100L) }, 100L)
redQueue.schedule(object : Task("task three", false) { redQueue.schedule(object : Task("task three") {
override fun runOnce(): Long { override fun runOnce(): Long {
log += "three:run@${backend.nanoTime()} tasksSize=${backend.tasksSize}" log += "three:run@${backend.nanoTime()} tasksSize=${backend.tasksSize}"
return -1L return -1L
} }
}, 100L) }, 100L)
backend.advanceUntil(0L) backend.advanceUntil(taskRunner, 0L)
assertThat(log).isEmpty() assertThat(log).isEmpty()
backend.advanceUntil(100L) backend.advanceUntil(taskRunner, 100L)
assertThat(log).containsExactly( assertThat(log).containsExactly(
"one:run@100 tasksSize=0", "one:run@100 tasksSize=0",
"two:run@100 tasksSize=0", "two:run@100 tasksSize=0",
@@ -325,31 +321,31 @@ class TaskRunnerTest {
/** Inspect how many runnables have been enqueued. If non-zero then we're truly parallel. */ /** Inspect how many runnables have been enqueued. If non-zero then we're truly parallel. */
@Test fun differentQueuesAreParallel() { @Test fun differentQueuesAreParallel() {
redQueue.schedule(object : Task("task one", false) { redQueue.schedule(object : Task("task one") {
override fun runOnce(): Long { override fun runOnce(): Long {
log += "one:run@${backend.nanoTime()} tasksSize=${backend.tasksSize}" log += "one:run@${backend.nanoTime()} tasksSize=${backend.tasksSize}"
return -1L return -1L
} }
}, 100L) }, 100L)
blueQueue.schedule(object : Task("task two", false) { blueQueue.schedule(object : Task("task two") {
override fun runOnce(): Long { override fun runOnce(): Long {
log += "two:run@${backend.nanoTime()} tasksSize=${backend.tasksSize}" log += "two:run@${backend.nanoTime()} tasksSize=${backend.tasksSize}"
return -1L return -1L
} }
}, 100L) }, 100L)
greenQueue.schedule(object : Task("task three", false) { greenQueue.schedule(object : Task("task three") {
override fun runOnce(): Long { override fun runOnce(): Long {
log += "three:run@${backend.nanoTime()} tasksSize=${backend.tasksSize}" log += "three:run@${backend.nanoTime()} tasksSize=${backend.tasksSize}"
return -1L return -1L
} }
}, 100L) }, 100L)
backend.advanceUntil(0L) backend.advanceUntil(taskRunner, 0L)
assertThat(log).isEmpty() assertThat(log).isEmpty()
backend.advanceUntil(100L) backend.advanceUntil(taskRunner, 100L)
assertThat(log).containsExactly( assertThat(log).containsExactly(
"one:run@100 tasksSize=2", "one:run@100 tasksSize=2",
"two:run@100 tasksSize=1", "two:run@100 tasksSize=1",
@@ -361,13 +357,13 @@ class TaskRunnerTest {
/** Test the introspection method [TaskQueue.scheduledTasks]. */ /** Test the introspection method [TaskQueue.scheduledTasks]. */
@Test fun scheduledTasks() { @Test fun scheduledTasks() {
redQueue.schedule(object : Task("task one", false) { redQueue.schedule(object : Task("task one") {
override fun runOnce(): Long = -1L override fun runOnce(): Long = -1L
override fun toString() = "one" override fun toString() = "one"
}, 100L) }, 100L)
redQueue.schedule(object : Task("task two", false) { redQueue.schedule(object : Task("task two") {
override fun runOnce(): Long = -1L override fun runOnce(): Long = -1L
override fun toString() = "two" override fun toString() = "two"
@@ -381,7 +377,7 @@ class TaskRunnerTest {
* cumbersome to implement properly because the active task might be a cancel. * cumbersome to implement properly because the active task might be a cancel.
*/ */
@Test fun scheduledTasksDoesNotIncludeRunningTask() { @Test fun scheduledTasksDoesNotIncludeRunningTask() {
redQueue.schedule(object : Task("task one", false) { redQueue.schedule(object : Task("task one") {
val schedules = mutableListOf(200L) val schedules = mutableListOf(200L)
override fun runOnce(): Long { override fun runOnce(): Long {
if (schedules.isNotEmpty()) { if (schedules.isNotEmpty()) {
@@ -394,7 +390,7 @@ class TaskRunnerTest {
override fun toString() = "one" override fun toString() = "one"
}, 100L) }, 100L)
redQueue.schedule(object : Task("task two", false) { redQueue.schedule(object : Task("task two") {
override fun runOnce(): Long { override fun runOnce(): Long {
log += "scheduledTasks=${redQueue.scheduledTasks}" log += "scheduledTasks=${redQueue.scheduledTasks}"
return -1L return -1L
@@ -403,18 +399,18 @@ class TaskRunnerTest {
override fun toString() = "two" override fun toString() = "two"
}, 200L) }, 200L)
backend.advanceUntil(100L) backend.advanceUntil(taskRunner, 100L)
assertThat(log).containsExactly( assertThat(log).containsExactly(
"scheduledTasks=[two, one]" "scheduledTasks=[two, one]"
) )
backend.advanceUntil(200L) backend.advanceUntil(taskRunner, 200L)
assertThat(log).containsExactly( assertThat(log).containsExactly(
"scheduledTasks=[two, one]", "scheduledTasks=[two, one]",
"scheduledTasks=[one]" "scheduledTasks=[one]"
) )
backend.advanceUntil(300L) backend.advanceUntil(taskRunner, 300L)
assertThat(log).containsExactly( assertThat(log).containsExactly(
"scheduledTasks=[two, one]", "scheduledTasks=[two, one]",
"scheduledTasks=[one]", "scheduledTasks=[one]",
@@ -430,42 +426,42 @@ class TaskRunnerTest {
* queues have work scheduled. * queues have work scheduled.
*/ */
@Test fun activeQueuesContainsOnlyQueuesWithScheduledTasks() { @Test fun activeQueuesContainsOnlyQueuesWithScheduledTasks() {
redQueue.schedule(object : Task("task one", false) { redQueue.schedule(object : Task("task one") {
override fun runOnce() = -1L override fun runOnce() = -1L
}, 100L) }, 100L)
blueQueue.schedule(object : Task("task two", false) { blueQueue.schedule(object : Task("task two") {
override fun runOnce() = -1L override fun runOnce() = -1L
}, 200L) }, 200L)
backend.advanceUntil(0L) backend.advanceUntil(taskRunner, 0L)
assertThat(taskRunner.activeQueues()).containsExactly(redQueue, blueQueue) assertThat(taskRunner.activeQueues()).containsExactly(redQueue, blueQueue)
backend.advanceUntil(100L) backend.advanceUntil(taskRunner, 100L)
assertThat(taskRunner.activeQueues()).containsExactly(blueQueue) assertThat(taskRunner.activeQueues()).containsExactly(blueQueue)
backend.advanceUntil(200L) backend.advanceUntil(taskRunner, 200L)
assertThat(taskRunner.activeQueues()).isEmpty() assertThat(taskRunner.activeQueues()).isEmpty()
backend.assertNoMoreTasks() backend.assertNoMoreTasks()
} }
@Test fun taskNameIsUsedForThreadNameWhenRunning() { @Test fun taskNameIsUsedForThreadNameWhenRunning() {
redQueue.schedule(object : Task("lucky task", false) { redQueue.schedule(object : Task("lucky task") {
override fun runOnce(): Long { override fun runOnce(): Long {
log += "run threadName:${Thread.currentThread().name}" log += "run threadName:${Thread.currentThread().name}"
return -1L return -1L
} }
}, 0L) })
backend.advanceUntil(0L) backend.advanceUntil(taskRunner, 0L)
assertThat(log).containsExactly("run threadName:lucky task") assertThat(log).containsExactly("run threadName:lucky task")
backend.assertNoMoreTasks() backend.assertNoMoreTasks()
} }
@Test fun taskNameIsUsedForThreadNameWhenCanceling() { @Test fun taskNameIsUsedForThreadNameWhenCanceling() {
redQueue.schedule(object : Task("lucky task", false) { redQueue.schedule(object : Task("lucky task") {
override fun tryCancel(): Boolean { override fun tryCancel(): Boolean {
log += "cancel threadName:${Thread.currentThread().name}" log += "cancel threadName:${Thread.currentThread().name}"
return true return true
@@ -474,12 +470,12 @@ class TaskRunnerTest {
override fun runOnce() = -1L override fun runOnce() = -1L
}, 100L) }, 100L)
backend.advanceUntil(0L) backend.advanceUntil(taskRunner, 0L)
assertThat(log).isEmpty() assertThat(log).isEmpty()
redQueue.cancelAll() redQueue.cancelAll()
backend.advanceUntil(0L) backend.advanceUntil(taskRunner, 0L)
assertThat(log).containsExactly("cancel threadName:lucky task") assertThat(log).containsExactly("cancel threadName:lucky task")
backend.assertNoMoreTasks() backend.assertNoMoreTasks()
@@ -498,9 +494,6 @@ class TaskRunnerTest {
/** How many tasks can be executed immediately. */ /** How many tasks can be executed immediately. */
val tasksSize: Int get() = tasks.size val tasksSize: Int get() = tasks.size
/** The task runner to lock on. */
lateinit var taskRunner: TaskRunner
/** Guarded by taskRunner. */ /** Guarded by taskRunner. */
private var nanoTime = 0L private var nanoTime = 0L
@@ -513,12 +506,10 @@ class TaskRunnerTest {
} }
override fun executeTask(runnable: Runnable) { override fun executeTask(runnable: Runnable) {
check(Thread.holdsLock(taskRunner))
tasks += runnable tasks += runnable
} }
override fun nanoTime(): Long { override fun nanoTime(): Long {
check(Thread.holdsLock(taskRunner))
return nanoTime return nanoTime
} }
@@ -540,14 +531,14 @@ class TaskRunnerTest {
} }
/** Advance the simulated clock and run anything ready at the new time. */ /** Advance the simulated clock and run anything ready at the new time. */
fun advanceUntil(newTime: Long) { fun advanceUntil(taskRunner: TaskRunner, newTime: Long) {
check(!Thread.holdsLock(taskRunner)) check(!Thread.holdsLock(taskRunner))
synchronized(taskRunner) { synchronized(taskRunner) {
nanoTime = newTime nanoTime = newTime
while (true) { while (true) {
runRunnables() runRunnables(taskRunner)
if (coordinatorWaitingUntilTime <= nanoTime) { if (coordinatorWaitingUntilTime <= nanoTime) {
// Let the coordinator do its business at the new time. // Let the coordinator do its business at the new time.
@@ -561,7 +552,7 @@ class TaskRunnerTest {
} }
/** Returns true if anything was executed. */ /** Returns true if anything was executed. */
private fun runRunnables() { private fun runRunnables(taskRunner: TaskRunner) {
check(Thread.holdsLock(taskRunner)) check(Thread.holdsLock(taskRunner))
if (coordinatorToRun != null) { if (coordinatorToRun != null) {
@@ -593,7 +584,7 @@ class TaskRunnerTest {
assertThat(coordinatorWaitingUntilTime).isEqualTo(Long.MAX_VALUE) assertThat(coordinatorWaitingUntilTime).isEqualTo(Long.MAX_VALUE)
} }
fun interruptCoordinatorThread() { fun interruptCoordinatorThread(taskRunner: TaskRunner) {
check(!Thread.holdsLock(taskRunner)) check(!Thread.holdsLock(taskRunner))
synchronized(taskRunner) { synchronized(taskRunner) {

View File

@@ -30,12 +30,16 @@ import okhttp3.OkHttpClient;
import okhttp3.Request; import okhttp3.Request;
import okhttp3.Route; import okhttp3.Route;
import okhttp3.internal.RecordingOkAuthenticator; import okhttp3.internal.RecordingOkAuthenticator;
import okhttp3.internal.concurrent.TaskRunner;
import okhttp3.internal.concurrent.TaskRunnerTest;
import org.junit.Test; import org.junit.Test;
import static okhttp3.TestUtil.awaitGarbageCollection; import static okhttp3.TestUtil.awaitGarbageCollection;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
public final class ConnectionPoolTest { public final class ConnectionPoolTest {
/** The fake task runner prevents the cleanup runnable from being started. */
private final TaskRunner taskRunner = new TaskRunner(new TaskRunnerTest.FakeBackend());
private final Address addressA = newAddress("a"); private final Address addressA = newAddress("a");
private final Route routeA1 = newRoute(addressA); private final Route routeA1 = newRoute(addressA);
private final Address addressB = newAddress("b"); private final Address addressB = newAddress("b");
@@ -44,8 +48,8 @@ public final class ConnectionPoolTest {
private final Route routeC1 = newRoute(addressC); private final Route routeC1 = newRoute(addressC);
@Test public void connectionsEvictedWhenIdleLongEnough() throws Exception { @Test public void connectionsEvictedWhenIdleLongEnough() throws Exception {
RealConnectionPool pool = new RealConnectionPool(Integer.MAX_VALUE, 100L, TimeUnit.NANOSECONDS); RealConnectionPool pool = new RealConnectionPool(
pool.setCleanupRunning(true); // Prevent the cleanup runnable from being started. taskRunner, Integer.MAX_VALUE, 100L, TimeUnit.NANOSECONDS);
RealConnection c1 = newConnection(pool, routeA1, 50L); RealConnection c1 = newConnection(pool, routeA1, 50L);
@@ -76,9 +80,9 @@ public final class ConnectionPoolTest {
} }
@Test public void inUseConnectionsNotEvicted() throws Exception { @Test public void inUseConnectionsNotEvicted() throws Exception {
ConnectionPool poolApi = new ConnectionPool(Integer.MAX_VALUE, 100L, TimeUnit.NANOSECONDS); RealConnectionPool pool = new RealConnectionPool(
RealConnectionPool pool = RealConnectionPool.Companion.get(poolApi); taskRunner, Integer.MAX_VALUE, 100L, TimeUnit.NANOSECONDS);
pool.setCleanupRunning(true); // Prevent the cleanup runnable from being started. ConnectionPool poolApi = new ConnectionPool(pool);
RealConnection c1 = newConnection(pool, routeA1, 50L); RealConnection c1 = newConnection(pool, routeA1, 50L);
synchronized (pool) { synchronized (pool) {
@@ -108,8 +112,8 @@ public final class ConnectionPoolTest {
} }
@Test public void cleanupPrioritizesEarliestEviction() throws Exception { @Test public void cleanupPrioritizesEarliestEviction() throws Exception {
RealConnectionPool pool = new RealConnectionPool(Integer.MAX_VALUE, 100L, TimeUnit.NANOSECONDS); RealConnectionPool pool = new RealConnectionPool(
pool.setCleanupRunning(true); // Prevent the cleanup runnable from being started. taskRunner, Integer.MAX_VALUE, 100L, TimeUnit.NANOSECONDS);
RealConnection c1 = newConnection(pool, routeA1, 75L); RealConnection c1 = newConnection(pool, routeA1, 75L);
RealConnection c2 = newConnection(pool, routeB1, 50L); RealConnection c2 = newConnection(pool, routeB1, 50L);
@@ -140,8 +144,8 @@ public final class ConnectionPoolTest {
} }
@Test public void oldestConnectionsEvictedIfIdleLimitExceeded() throws Exception { @Test public void oldestConnectionsEvictedIfIdleLimitExceeded() throws Exception {
RealConnectionPool pool = new RealConnectionPool(2, 100L, TimeUnit.NANOSECONDS); RealConnectionPool pool = new RealConnectionPool(
pool.setCleanupRunning(true); // Prevent the cleanup runnable from being started. taskRunner, 2, 100L, TimeUnit.NANOSECONDS);
RealConnection c1 = newConnection(pool, routeA1, 50L); RealConnection c1 = newConnection(pool, routeA1, 50L);
RealConnection c2 = newConnection(pool, routeB1, 75L); RealConnection c2 = newConnection(pool, routeB1, 75L);
@@ -164,9 +168,9 @@ public final class ConnectionPoolTest {
} }
@Test public void leakedAllocation() throws Exception { @Test public void leakedAllocation() throws Exception {
ConnectionPool poolApi = new ConnectionPool(Integer.MAX_VALUE, 100L, TimeUnit.NANOSECONDS); RealConnectionPool pool = new RealConnectionPool(
RealConnectionPool pool = RealConnectionPool.Companion.get(poolApi); taskRunner, Integer.MAX_VALUE, 100L, TimeUnit.NANOSECONDS);
pool.setCleanupRunning(true); // Prevent the cleanup runnable from being started. ConnectionPool poolApi = new ConnectionPool(pool);
RealConnection c1 = newConnection(pool, routeA1, 0L); RealConnection c1 = newConnection(pool, routeA1, 0L);
allocateAndLeakAllocation(poolApi, c1); allocateAndLeakAllocation(poolApi, c1);
@@ -180,24 +184,26 @@ public final class ConnectionPoolTest {
} }
@Test public void interruptStopsThread() throws Exception { @Test public void interruptStopsThread() throws Exception {
RealConnectionPool pool = new RealConnectionPool(2, 100L, TimeUnit.NANOSECONDS); TaskRunner realTaskRunner = TaskRunner.Companion.getINSTANCE();
RealConnectionPool pool = new RealConnectionPool(
realTaskRunner, 2, 100L, TimeUnit.NANOSECONDS);
RealConnection c1 = newConnection(pool, routeA1, Long.MAX_VALUE); RealConnection c1 = newConnection(pool, routeA1, Long.MAX_VALUE);
assertThat(pool.getCleanupRunning()).isTrue(); assertThat(realTaskRunner.activeQueues()).isNotEmpty();
Thread.sleep(100); Thread.sleep(100);
Thread[] threads = new Thread[Thread.activeCount() * 2]; Thread[] threads = new Thread[Thread.activeCount() * 2];
Thread.enumerate(threads); Thread.enumerate(threads);
for (Thread t: threads) { for (Thread t: threads) {
if (t != null && t.getName().equals("OkHttp ConnectionPool")) { if (t != null && t.getName().equals("OkHttp Task Coordinator")) {
t.interrupt(); t.interrupt();
} }
} }
Thread.sleep(100); Thread.sleep(100);
assertThat(pool.getCleanupRunning()).isFalse(); assertThat(realTaskRunner.activeQueues()).isEmpty();
} }
/** Use a helper method so there's no hidden reference remaining on the stack. */ /** Use a helper method so there's no hidden reference remaining on the stack. */