mirror of
https://github.com/square/okhttp.git
synced 2025-08-07 12:42:57 +03:00
Start fewer threads in TaskRunner (#8391)
We've got a race where we'll start a thread when we need one, even if we've already started a thread. This changes TaskRunner's behavior to never add a thread if we're still waiting for a recently-added one to start running. This is intended to reduce the number of threads contenting for the TaskRunner lock as reported in this issue: https://github.com/square/okhttp/issues/8388
This commit is contained in:
@@ -63,29 +63,32 @@ class TaskFaker : Closeable {
|
||||
|
||||
/**
|
||||
* True if this task faker has ever had multiple tasks scheduled to run concurrently. Guarded by
|
||||
* [taskRunner].
|
||||
* [TaskRunner.lock].
|
||||
*/
|
||||
var isParallel = false
|
||||
|
||||
/** Number of calls to [TaskRunner.Backend.execute]. Guarded by [TaskRunner.lock]. */
|
||||
var executeCallCount = 0
|
||||
|
||||
/** Guarded by [taskRunner]. */
|
||||
var nanoTime = 0L
|
||||
private set
|
||||
|
||||
/** Backlog of tasks to run. Only one task runs at a time. Guarded by [taskRunner]. */
|
||||
/** Backlog of tasks to run. Only one task runs at a time. Guarded by [TaskRunner.lock]. */
|
||||
private val serialTaskQueue = ArrayDeque<SerialTask>()
|
||||
|
||||
/** The task that's currently executing. Guarded by [taskRunner]. */
|
||||
/** The task that's currently executing. Guarded by [TaskRunner.lock]. */
|
||||
private var currentTask: SerialTask = TestThreadSerialTask
|
||||
|
||||
/** The coordinator task if it's waiting, and how it will resume. Guarded by [taskRunner]. */
|
||||
/** The coordinator task if it's waiting, and how it will resume. Guarded by [TaskRunner.lock]. */
|
||||
private var waitingCoordinatorTask: SerialTask? = null
|
||||
private var waitingCoordinatorInterrupted = false
|
||||
private var waitingCoordinatorNotified = false
|
||||
|
||||
/** How many times a new task has been started. Guarded by [taskRunner]. */
|
||||
/** How many times a new task has been started. Guarded by [TaskRunner.lock]. */
|
||||
private var contextSwitchCount = 0
|
||||
|
||||
/** Guarded by [taskRunner]. */
|
||||
/** Guarded by [TaskRunner.lock]. */
|
||||
private var activeThreads = 0
|
||||
|
||||
/** A task runner that posts tasks to this fake. Tasks won't be executed until requested. */
|
||||
@@ -100,6 +103,7 @@ class TaskFaker : Closeable {
|
||||
|
||||
val queuedTask = RunnableSerialTask(runnable)
|
||||
serialTaskQueue += queuedTask
|
||||
executeCallCount++
|
||||
isParallel = serialTaskQueue.size > 1
|
||||
}
|
||||
|
||||
|
@@ -52,6 +52,17 @@ class TaskRunner(
|
||||
private var coordinatorWaiting = false
|
||||
private var coordinatorWakeUpAt = 0L
|
||||
|
||||
/**
|
||||
* When we need a new thread to run tasks, we call [Backend.execute]. A few microseconds later we
|
||||
* expect a newly-started thread to call [Runnable.run]. We shouldn't request new threads until
|
||||
* the already-requested ones are in service, otherwise we might create more threads than we need.
|
||||
*
|
||||
* We use [executeCallCount] and [runCallCount] to defend against starting more threads than we
|
||||
* need. Both fields are guarded by [lock].
|
||||
*/
|
||||
private var executeCallCount = 0
|
||||
private var runCallCount = 0
|
||||
|
||||
/** Queues with tasks that are currently executing their [TaskQueue.activeTask]. */
|
||||
private val busyQueues = mutableListOf<TaskQueue>()
|
||||
|
||||
@@ -61,9 +72,14 @@ class TaskRunner(
|
||||
private val runnable: Runnable =
|
||||
object : Runnable {
|
||||
override fun run() {
|
||||
var incrementedRunCallCount = false
|
||||
while (true) {
|
||||
val task =
|
||||
this@TaskRunner.lock.withLock {
|
||||
if (!incrementedRunCallCount) {
|
||||
incrementedRunCallCount = true
|
||||
runCallCount++
|
||||
}
|
||||
awaitTaskToRun()
|
||||
} ?: return
|
||||
|
||||
@@ -76,7 +92,7 @@ class TaskRunner(
|
||||
// If the task is crashing start another thread to service the queues.
|
||||
if (!completedNormally) {
|
||||
lock.withLock {
|
||||
backend.execute(this@TaskRunner, this)
|
||||
startAnotherThread()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -99,7 +115,7 @@ class TaskRunner(
|
||||
if (coordinatorWaiting) {
|
||||
backend.coordinatorNotify(this@TaskRunner)
|
||||
} else {
|
||||
backend.execute(this@TaskRunner, runnable)
|
||||
startAnotherThread()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -157,7 +173,7 @@ class TaskRunner(
|
||||
* Returns an immediately-executable task for the calling thread to execute, sleeping as necessary
|
||||
* until one is ready. If there are no ready queues, or if other threads have everything under
|
||||
* control this will return null. If there is more than a single task ready to execute immediately
|
||||
* this will launch another thread to handle that work.
|
||||
* this will start another thread to handle that work.
|
||||
*/
|
||||
fun awaitTaskToRun(): Task? {
|
||||
lock.assertHeld()
|
||||
@@ -207,7 +223,7 @@ class TaskRunner(
|
||||
|
||||
// Also start another thread if there's more work or scheduling to do.
|
||||
if (multipleReadyTasks || !coordinatorWaiting && readyQueues.isNotEmpty()) {
|
||||
backend.execute(this@TaskRunner, runnable)
|
||||
startAnotherThread()
|
||||
}
|
||||
|
||||
return readyTask
|
||||
@@ -238,6 +254,15 @@ class TaskRunner(
|
||||
}
|
||||
}
|
||||
|
||||
/** Start another thread, unless a new thread is already scheduled to start. */
|
||||
private fun startAnotherThread() {
|
||||
lock.assertHeld()
|
||||
if (executeCallCount > runCallCount) return // A thread is still starting.
|
||||
|
||||
executeCallCount++
|
||||
backend.execute(this@TaskRunner, runnable)
|
||||
}
|
||||
|
||||
fun newQueue(): TaskQueue {
|
||||
val name = lock.withLock { nextQueueName++ }
|
||||
return TaskQueue(this, "Q$name")
|
||||
|
@@ -680,6 +680,96 @@ class TaskRunnerTest {
|
||||
assertThat(idleLatch2).isSameAs(idleLatch1)
|
||||
}
|
||||
|
||||
@Test fun cancelAllWhenEmptyDoesNotStartWorkerThread() {
|
||||
redQueue.execute("red task", 100.µs) {
|
||||
error("expected to be canceled")
|
||||
}
|
||||
assertThat(taskFaker.executeCallCount).isEqualTo(1)
|
||||
|
||||
blueQueue.execute("task", 100.µs) {
|
||||
error("expected to be canceled")
|
||||
}
|
||||
assertThat(taskFaker.executeCallCount).isEqualTo(1)
|
||||
|
||||
redQueue.cancelAll()
|
||||
assertThat(taskFaker.executeCallCount).isEqualTo(1)
|
||||
|
||||
blueQueue.cancelAll()
|
||||
assertThat(taskFaker.executeCallCount).isEqualTo(1)
|
||||
}
|
||||
|
||||
@Test fun noMoreThanOneWorkerThreadWaitingToStartAtATime() {
|
||||
// Enqueueing the red task starts a thread because the head of the queue changed.
|
||||
redQueue.execute("red task") {
|
||||
log += "red:starting@${taskFaker.nanoTime}"
|
||||
taskFaker.sleep(100.µs)
|
||||
log += "red:finishing@${taskFaker.nanoTime}"
|
||||
}
|
||||
assertThat(taskFaker.executeCallCount).isEqualTo(1)
|
||||
|
||||
// Enqueueing the blue task doesn't start a thread because the red one is still starting.
|
||||
blueQueue.execute("blue task") {
|
||||
log += "blue:starting@${taskFaker.nanoTime}"
|
||||
taskFaker.sleep(100.µs)
|
||||
log += "blue:finishing@${taskFaker.nanoTime}"
|
||||
}
|
||||
assertThat(taskFaker.executeCallCount).isEqualTo(1)
|
||||
|
||||
// Running the red task starts another thread, so the two can run in parallel.
|
||||
taskFaker.runNextTask()
|
||||
assertThat(log).containsExactly("red:starting@0")
|
||||
assertThat(taskFaker.executeCallCount).isEqualTo(2)
|
||||
|
||||
// Next the blue task starts.
|
||||
taskFaker.runNextTask()
|
||||
assertThat(log).containsExactly(
|
||||
"red:starting@0",
|
||||
"blue:starting@0",
|
||||
)
|
||||
assertThat(taskFaker.executeCallCount).isEqualTo(2)
|
||||
|
||||
// Advance time until the tasks complete.
|
||||
taskFaker.advanceUntil(100.µs)
|
||||
assertThat(log).containsExactly(
|
||||
"red:starting@0",
|
||||
"blue:starting@0",
|
||||
"red:finishing@100000",
|
||||
"blue:finishing@100000",
|
||||
)
|
||||
taskFaker.assertNoMoreTasks()
|
||||
assertThat(taskFaker.executeCallCount).isEqualTo(2)
|
||||
}
|
||||
|
||||
@Test fun onlyOneCoordinatorWaitingToStartFutureTasks() {
|
||||
// Enqueueing the red task starts a coordinator thread.
|
||||
redQueue.execute("red task", 100.µs) {
|
||||
log += "red:run@${taskFaker.nanoTime}"
|
||||
}
|
||||
assertThat(taskFaker.executeCallCount).isEqualTo(1)
|
||||
|
||||
// Enqueueing the blue task doesn't need a 2nd coordinator yet.
|
||||
blueQueue.execute("blue task", 200.µs) {
|
||||
log += "blue:run@${taskFaker.nanoTime}"
|
||||
}
|
||||
assertThat(taskFaker.executeCallCount).isEqualTo(1)
|
||||
|
||||
// Nothing to do.
|
||||
taskFaker.runTasks()
|
||||
assertThat(log).isEmpty()
|
||||
|
||||
// At 100.µs, the coordinator runs the red task and starts a thread for the new coordinator.
|
||||
taskFaker.advanceUntil(100.µs)
|
||||
assertThat(log).containsExactly("red:run@100000")
|
||||
assertThat(taskFaker.executeCallCount).isEqualTo(2)
|
||||
|
||||
// At 200.µs, the blue task runs.
|
||||
taskFaker.advanceUntil(200.µs)
|
||||
assertThat(log).containsExactly("red:run@100000", "blue:run@200000")
|
||||
assertThat(taskFaker.executeCallCount).isEqualTo(2)
|
||||
|
||||
taskFaker.assertNoMoreTasks()
|
||||
}
|
||||
|
||||
private val Int.µs: Long
|
||||
get() = this * 1_000L
|
||||
}
|
||||
|
Reference in New Issue
Block a user