1
0
mirror of https://github.com/square/okhttp.git synced 2026-01-12 10:23:16 +03:00

Merge pull request #5502 from square/jwilson.0926.http2

TaskRunner support for shutting down queues
This commit is contained in:
Jesse Wilson
2019-09-27 17:23:08 -04:00
committed by GitHub
11 changed files with 242 additions and 111 deletions

View File

@@ -25,6 +25,7 @@ import okhttp3.Request
import okhttp3.Response
import okhttp3.internal.addHeaderLenient
import okhttp3.internal.closeQuietly
import okhttp3.internal.concurrent.TaskRunner
import okhttp3.internal.duplex.MwsDuplexAccess
import okhttp3.internal.execute
import okhttp3.internal.http.HttpMethod
@@ -99,6 +100,7 @@ import javax.net.ssl.X509TrustManager
* in sequence.
*/
class MockWebServer : ExternalResource(), Closeable {
private val taskRunner = TaskRunner()
private val requestQueue = LinkedBlockingQueue<RecordedRequest>()
private val openClientSockets =
Collections.newSetFromMap(ConcurrentHashMap<Socket, Boolean>())
@@ -454,6 +456,12 @@ class MockWebServer : ExternalResource(), Closeable {
} catch (e: InterruptedException) {
throw AssertionError()
}
for (queue in taskRunner.activeQueues()) {
if (!queue.awaitIdle(TimeUnit.MILLISECONDS.toNanos(500L))) {
throw IOException("Gave up waiting for ${queue.owner} to shut down")
}
}
}
@Synchronized override fun after() {
@@ -533,7 +541,7 @@ class MockWebServer : ExternalResource(), Closeable {
if (protocol === Protocol.HTTP_2 || protocol === Protocol.H2_PRIOR_KNOWLEDGE) {
val http2SocketHandler = Http2SocketHandler(socket, protocol)
val connection = Http2Connection.Builder(false)
val connection = Http2Connection.Builder(false, taskRunner)
.socket(socket)
.listener(http2SocketHandler)
.build()

View File

@@ -29,6 +29,7 @@ import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import okhttp3.Headers;
import okhttp3.Protocol;
import okhttp3.internal.concurrent.TaskRunner;
import okhttp3.internal.http2.Header;
import okhttp3.internal.http2.Http2Connection;
import okhttp3.internal.http2.Http2Stream;
@@ -69,7 +70,7 @@ public final class Http2Server extends Http2Connection.Listener {
if (protocol != Protocol.HTTP_2) {
throw new ProtocolException("Protocol " + protocol + " unsupported");
}
Http2Connection connection = new Http2Connection.Builder(false)
Http2Connection connection = new Http2Connection.Builder(false, TaskRunner.INSTANCE)
.socket(sslSocket)
.listener(this)
.build();

View File

@@ -15,8 +15,6 @@
*/
package okhttp3
import okhttp3.internal.concurrent.Task
import okhttp3.internal.concurrent.TaskQueue
import okhttp3.internal.concurrent.TaskRunner
import okhttp3.testing.Flaky
import org.assertj.core.api.Assertions.assertThat
@@ -25,7 +23,6 @@ import org.junit.runner.Description
import org.junit.runners.model.Statement
import java.net.InetAddress
import java.util.concurrent.ConcurrentLinkedDeque
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
/** Apply this rule to tests that need an OkHttpClient instance. */
@@ -66,7 +63,7 @@ class OkHttpClientTestRule : TestRule {
private fun ensureAllTaskQueuesIdle() {
for (queue in TaskRunner.INSTANCE.activeQueues()) {
assertThat(queue.awaitIdle(500L, TimeUnit.MILLISECONDS))
assertThat(queue.awaitIdle(TimeUnit.MILLISECONDS.toNanos(500L)))
.withFailMessage("Queue ${queue.owner} still active after 500ms")
.isTrue()
}
@@ -133,19 +130,6 @@ class OkHttpClientTestRule : TestRule {
}
}
/** Returns true if this queue became idle before the timeout elapsed. */
private fun TaskQueue.awaitIdle(timeout: Long, timeUnit: TimeUnit): Boolean {
val latch = CountDownLatch(1)
schedule(object : Task("awaitIdle") {
override fun runOnce(): Long {
latch.countDown()
return -1L
}
})
return latch.await(timeout, timeUnit)
}
companion object {
/**
* Quick and dirty pool of OkHttpClient instances. Each has its own independent dispatcher and

View File

@@ -49,7 +49,6 @@ import java.util.LinkedHashMap
import java.util.Locale
import java.util.TimeZone
import java.util.concurrent.Executor
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.ThreadFactory
import java.util.concurrent.TimeUnit
import kotlin.text.Charsets.UTF_32BE
@@ -393,14 +392,6 @@ inline fun Executor.execute(name: String, crossinline block: () -> Unit) {
}
}
/** Executes [block] unless this executor has been shutdown, in which case this does nothing. */
inline fun Executor.tryExecute(name: String, crossinline block: () -> Unit) {
try {
execute(name, block)
} catch (_: RejectedExecutionException) {
}
}
fun Buffer.skipAll(b: Byte): Int {
var count = 0
while (!exhausted() && this[0] == b) {
@@ -510,34 +501,9 @@ fun Long.toHexString(): String = java.lang.Long.toHexString(this)
fun Int.toHexString(): String = Integer.toHexString(this)
/**
* Lock and wait a duration in nanoseconds. Unlike [java.lang.Object.wait] this interprets 0 as
* "don't wait" instead of "wait forever".
*/
@Throws(InterruptedException::class)
fun Any.lockAndWaitNanos(nanos: Long) {
synchronized(this) {
objectWaitNanos(nanos)
}
}
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN", "NOTHING_TO_INLINE")
inline fun Any.wait() = (this as Object).wait()
/**
* Wait a duration in nanoseconds. Unlike [java.lang.Object.wait] this interprets 0 as "don't wait"
* instead of "wait forever".
*/
@Throws(InterruptedException::class)
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
fun Any.objectWaitNanos(nanos: Long) {
val ms = nanos / 1_000_000L
val ns = nanos - (ms * 1_000_000L)
if (ms > 0L || nanos > 0) {
(this as Object).wait(ms, ns.toInt())
}
}
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN", "NOTHING_TO_INLINE")
inline fun Any.notify() = (this as Object).notify()

View File

@@ -16,6 +16,9 @@
package okhttp3.internal.concurrent
import okhttp3.internal.addIfAbsent
import java.util.concurrent.CountDownLatch
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.TimeUnit
/**
* A set of tasks that are executed in sequential order.
@@ -32,6 +35,8 @@ class TaskQueue internal constructor(
*/
val owner: Any
) {
private var shutdown = false
/** This queue's currently-executing task, or null if none is currently executing. */
private var activeTask: Task? = null
@@ -61,19 +66,55 @@ class TaskQueue internal constructor(
* The target execution time is implemented on a best-effort basis. If another task in this queue
* is running when that time is reached, that task is allowed to complete before this task is
* started. Similarly the task will be delayed if the host lacks compute resources.
*
* @throws RejectedExecutionException if the queue is shut down.
*/
fun schedule(task: Task, delayNanos: Long = 0L) {
task.initQueue(this)
synchronized(taskRunner) {
if (shutdown) throw RejectedExecutionException()
if (scheduleAndDecide(task, delayNanos)) {
taskRunner.kickCoordinator(this)
}
}
}
/** Like [schedule], but this silently discard the task if the queue is shut down. */
fun trySchedule(task: Task, delayNanos: Long = 0L) {
synchronized(taskRunner) {
if (shutdown) return
if (scheduleAndDecide(task, delayNanos)) {
taskRunner.kickCoordinator(this)
}
}
}
/** Returns true if this queue became idle before the timeout elapsed. */
fun awaitIdle(delayNanos: Long): Boolean {
val latch = CountDownLatch(1)
val task = object : Task("awaitIdle") {
override fun runOnce(): Long {
latch.countDown()
return -1L
}
}
// Don't delegate to schedule because that has to honor shutdown rules.
synchronized(taskRunner) {
if (scheduleAndDecide(task, 0L)) {
taskRunner.kickCoordinator(this)
}
}
return latch.await(delayNanos, TimeUnit.NANOSECONDS)
}
/** Adds [task] to run in [delayNanos]. Returns true if the coordinator should run. */
private fun scheduleAndDecide(task: Task, delayNanos: Long): Boolean {
task.initQueue(this)
val now = taskRunner.backend.nanoTime()
val executeNanoTime = now + delayNanos
@@ -100,6 +141,8 @@ class TaskQueue internal constructor(
* be removed from the execution schedule.
*/
fun cancelAll() {
check(!Thread.holdsLock(this))
synchronized(taskRunner) {
if (cancelAllAndDecide()) {
taskRunner.kickCoordinator(this)
@@ -107,6 +150,17 @@ class TaskQueue internal constructor(
}
}
fun shutdown() {
check(!Thread.holdsLock(this))
synchronized(taskRunner) {
shutdown = true
if (cancelAllAndDecide()) {
taskRunner.kickCoordinator(this)
}
}
}
/** Returns true if the coordinator should run. */
private fun cancelAllAndDecide(): Boolean {
val runningTask = activeTask
@@ -160,7 +214,7 @@ class TaskQueue internal constructor(
synchronized(taskRunner) {
check(activeTask === task)
if (delayNanos != -1L) {
if (delayNanos != -1L && !shutdown) {
scheduleAndDecide(task, delayNanos)
} else if (!futureTasks.contains(task)) {
cancelTasks.remove(task) // We don't need to cancel it because it isn't scheduled.

View File

@@ -17,7 +17,6 @@ package okhttp3.internal.concurrent
import okhttp3.internal.addIfAbsent
import okhttp3.internal.notify
import okhttp3.internal.objectWaitNanos
import okhttp3.internal.threadFactory
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.SynchronousQueue
@@ -158,8 +157,18 @@ class TaskRunner(
taskRunner.notify()
}
/**
* Wait a duration in nanoseconds. Unlike [java.lang.Object.wait] this interprets 0 as
* "don't wait" instead of "wait forever".
*/
@Throws(InterruptedException::class)
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
override fun coordinatorWait(taskRunner: TaskRunner, nanos: Long) {
taskRunner.objectWaitNanos(nanos)
val ms = nanos / 1_000_000L
val ns = nanos - (ms * 1_000_000L)
if (ms > 0L || nanos > 0) {
(taskRunner as Object).wait(ms, ns.toInt())
}
}
fun shutdown() {

View File

@@ -33,6 +33,7 @@ import okhttp3.Response
import okhttp3.Route
import okhttp3.internal.EMPTY_RESPONSE
import okhttp3.internal.closeQuietly
import okhttp3.internal.concurrent.TaskRunner
import okhttp3.internal.http.ExchangeCodec
import okhttp3.internal.http1.Http1ExchangeCodec
import okhttp3.internal.http2.ConnectionShutdownException
@@ -321,7 +322,7 @@ class RealConnection(
val source = this.source!!
val sink = this.sink!!
socket.soTimeout = 0 // HTTP/2 connection timeouts are set per-stream.
val http2Connection = Http2Connection.Builder(true)
val http2Connection = Http2Connection.Builder(client = true, taskRunner = TaskRunner.INSTANCE)
.socket(socket, route.address.url.host, source, sink)
.listener(this)
.pingIntervalMillis(pingIntervalMillis)

View File

@@ -18,9 +18,10 @@ package okhttp3.internal.http2
import okhttp3.internal.EMPTY_BYTE_ARRAY
import okhttp3.internal.EMPTY_HEADERS
import okhttp3.internal.closeQuietly
import okhttp3.internal.concurrent.Task
import okhttp3.internal.concurrent.TaskRunner
import okhttp3.internal.connectionName
import okhttp3.internal.execute
import okhttp3.internal.format
import okhttp3.internal.http2.ErrorCode.REFUSED_STREAM
import okhttp3.internal.http2.Settings.Companion.DEFAULT_INITIAL_WINDOW_SIZE
import okhttp3.internal.ignoreIoExceptions
@@ -28,9 +29,7 @@ import okhttp3.internal.notifyAll
import okhttp3.internal.platform.Platform
import okhttp3.internal.platform.Platform.Companion.INFO
import okhttp3.internal.threadFactory
import okhttp3.internal.threadName
import okhttp3.internal.toHeaders
import okhttp3.internal.tryExecute
import okhttp3.internal.wait
import okio.Buffer
import okio.BufferedSink
@@ -43,12 +42,9 @@ import java.io.Closeable
import java.io.IOException
import java.io.InterruptedIOException
import java.net.Socket
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ScheduledThreadPoolExecutor
import java.util.concurrent.SynchronousQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeUnit.MILLISECONDS
/**
* A socket connection to a remote peer. A connection hosts streams which can send and receive
@@ -91,13 +87,10 @@ class Http2Connection internal constructor(builder: Builder) : Closeable {
internal set
/** Asynchronously writes frames to the outgoing socket. */
private val writerExecutor = ScheduledThreadPoolExecutor(1,
threadFactory(format("OkHttp %s Writer", connectionName), false))
private val writerQueue = builder.taskRunner.newQueue("$connectionName Writer")
/** Ensures push promise callbacks events are sent in order per stream. */
// Like newSingleThreadExecutor, except lazy creates the thread.
private val pushExecutor = ThreadPoolExecutor(0, 1, 60L, TimeUnit.SECONDS, LinkedBlockingQueue(),
threadFactory(format("OkHttp %s Push Observer", connectionName), true))
private val pushQueue = builder.taskRunner.newQueue("$connectionName Push")
/** User code to run in response to push promise events. */
private val pushObserver: PushObserver = builder.pushObserver
@@ -149,11 +142,15 @@ class Http2Connection internal constructor(builder: Builder) : Closeable {
init {
if (builder.pingIntervalMillis != 0) {
writerExecutor.scheduleAtFixedRate({
threadName("OkHttp $connectionName ping") {
val pingIntervalNanos = TimeUnit.MILLISECONDS.toNanos(builder.pingIntervalMillis.toLong())
writerQueue.schedule(object : Task("OkHttp $connectionName ping") {
override fun runOnce(): Long {
writePing(false, 0, 0)
return pingIntervalNanos
}
}, builder.pingIntervalMillis.toLong(), builder.pingIntervalMillis.toLong(), MILLISECONDS)
override fun tryCancel() = true
}, pingIntervalNanos)
}
}
@@ -328,13 +325,18 @@ class Http2Connection internal constructor(builder: Builder) : Closeable {
streamId: Int,
errorCode: ErrorCode
) {
writerExecutor.tryExecute("OkHttp $connectionName stream $streamId") {
try {
writeSynReset(streamId, errorCode)
} catch (e: IOException) {
failConnection(e)
writerQueue.trySchedule(object : Task("OkHttp $connectionName stream $streamId") {
override fun runOnce(): Long {
try {
writeSynReset(streamId, errorCode)
} catch (e: IOException) {
failConnection(e)
}
return -1L
}
}
override fun tryCancel() = true
})
}
@Throws(IOException::class)
@@ -349,13 +351,18 @@ class Http2Connection internal constructor(builder: Builder) : Closeable {
streamId: Int,
unacknowledgedBytesRead: Long
) {
writerExecutor.tryExecute("OkHttp Window Update $connectionName stream $streamId") {
try {
writer.windowUpdate(streamId, unacknowledgedBytesRead)
} catch (e: IOException) {
failConnection(e)
writerQueue.trySchedule(object : Task("OkHttp Window Update $connectionName stream $streamId") {
override fun runOnce(): Long {
try {
writer.windowUpdate(streamId, unacknowledgedBytesRead)
} catch (e: IOException) {
failConnection(e)
}
return -1L
}
}
override fun tryCancel() = true
})
}
fun writePing(
@@ -467,8 +474,8 @@ class Http2Connection internal constructor(builder: Builder) : Closeable {
}
// Release the threads.
writerExecutor.shutdown()
pushExecutor.shutdown()
writerQueue.shutdown()
pushQueue.shutdown()
}
private fun failConnection(e: IOException?) {
@@ -511,7 +518,8 @@ class Http2Connection internal constructor(builder: Builder) : Closeable {
class Builder(
/** True if this peer initiated the connection; false if this peer accepted the connection. */
internal var client: Boolean
internal var client: Boolean,
internal val taskRunner: TaskRunner
) {
internal lateinit var socket: Socket
internal lateinit var connectionName: String
@@ -659,9 +667,14 @@ class Http2Connection internal constructor(builder: Builder) : Closeable {
}
override fun settings(clearPrevious: Boolean, settings: Settings) {
writerExecutor.tryExecute("OkHttp $connectionName ACK Settings") {
applyAndAckSettings(clearPrevious, settings)
}
writerQueue.trySchedule(object : Task("OkHttp $connectionName ACK Settings") {
override fun runOnce(): Long {
applyAndAckSettings(clearPrevious, settings)
return -1L
}
override fun tryCancel() = true
})
}
/**
@@ -725,9 +738,14 @@ class Http2Connection internal constructor(builder: Builder) : Closeable {
}
} else {
// Send a reply to a client ping if this is a server and vice versa.
writerExecutor.tryExecute("OkHttp $connectionName ping") {
writePing(true, payload1, payload2)
}
writerQueue.trySchedule(object : Task("OkHttp $connectionName ping") {
override fun runOnce(): Long {
writePing(true, payload1, payload2)
return -1L
}
override fun tryCancel() = true
})
}
}
@@ -812,8 +830,8 @@ class Http2Connection internal constructor(builder: Builder) : Closeable {
}
currentPushRequests.add(streamId)
}
if (!isShutdown) {
pushExecutor.tryExecute("OkHttp $connectionName Push Request[$streamId]") {
pushQueue.trySchedule(object : Task("OkHttp $connectionName Push Request[$streamId]") {
override fun runOnce(): Long {
val cancel = pushObserver.onRequest(streamId, requestHeaders)
ignoreIoExceptions {
if (cancel) {
@@ -823,8 +841,11 @@ class Http2Connection internal constructor(builder: Builder) : Closeable {
}
}
}
return -1L
}
}
override fun tryCancel() = true
})
}
internal fun pushHeadersLater(
@@ -832,8 +853,8 @@ class Http2Connection internal constructor(builder: Builder) : Closeable {
requestHeaders: List<Header>,
inFinished: Boolean
) {
if (!isShutdown) {
pushExecutor.tryExecute("OkHttp $connectionName Push Headers[$streamId]") {
pushQueue.trySchedule(object : Task("OkHttp $connectionName Push Headers[$streamId]") {
override fun runOnce(): Long {
val cancel = pushObserver.onHeaders(streamId, requestHeaders, inFinished)
ignoreIoExceptions {
if (cancel) writer.rstStream(streamId, ErrorCode.CANCEL)
@@ -843,8 +864,11 @@ class Http2Connection internal constructor(builder: Builder) : Closeable {
}
}
}
return -1L
}
}
override fun tryCancel() = true
})
}
/**
@@ -861,8 +885,8 @@ class Http2Connection internal constructor(builder: Builder) : Closeable {
val buffer = Buffer()
source.require(byteCount.toLong()) // Eagerly read the frame before firing client thread.
source.read(buffer, byteCount.toLong())
if (!isShutdown) {
pushExecutor.execute("OkHttp $connectionName Push Data[$streamId]") {
pushQueue.trySchedule(object : Task("OkHttp $connectionName Push Data[$streamId]") {
override fun runOnce(): Long {
ignoreIoExceptions {
val cancel = pushObserver.onData(streamId, buffer, byteCount, inFinished)
if (cancel) writer.rstStream(streamId, ErrorCode.CANCEL)
@@ -872,19 +896,23 @@ class Http2Connection internal constructor(builder: Builder) : Closeable {
}
}
}
return -1L
}
}
override fun tryCancel() = true
})
}
internal fun pushResetLater(streamId: Int, errorCode: ErrorCode) {
if (!isShutdown) {
pushExecutor.execute("OkHttp $connectionName Push Reset[$streamId]") {
pushQueue.trySchedule(object : Task("OkHttp $connectionName Push Reset[$streamId]") {
override fun runOnce(): Long {
pushObserver.onReset(streamId, errorCode)
synchronized(this@Http2Connection) {
currentPushRequests.remove(streamId)
}
return -1L
}
}
})
}
/** Listener of streams and settings initiated by the peer. */

View File

@@ -40,7 +40,7 @@ class TaskFaker {
/** How many tasks can be executed immediately. */
val tasksSize: Int get() = tasks.size
/** Guarded by taskRunner. */
/** Guarded by [taskRunner]. */
var nanoTime = 0L
private set
@@ -143,7 +143,9 @@ class TaskFaker {
fun assertNoMoreTasks() {
assertThat(coordinatorToRun).isNull()
assertThat(tasks).isEmpty()
assertThat(coordinatorWaitingUntilTime).isEqualTo(Long.MAX_VALUE)
assertThat(coordinatorWaitingUntilTime)
.withFailMessage("tasks are scheduled to run at $coordinatorWaitingUntilTime")
.isEqualTo(Long.MAX_VALUE)
}
fun interruptCoordinatorThread() {

View File

@@ -16,7 +16,9 @@
package okhttp3.internal.concurrent
import org.assertj.core.api.Assertions.assertThat
import org.junit.Assert.fail
import org.junit.Test
import java.util.concurrent.RejectedExecutionException
class TaskRunnerTest {
private val taskFaker = TaskFaker()
@@ -470,4 +472,79 @@ class TaskRunnerTest {
taskFaker.assertNoMoreTasks()
}
@Test fun shutdownSuccessfullyCancelsScheduledTasks() {
redQueue.schedule(object : Task("task") {
override fun runOnce(): Long {
log += "run@${taskFaker.nanoTime}"
return -1L
}
override fun tryCancel(): Boolean {
log += "cancel@${taskFaker.nanoTime}"
return true
}
}, 100L)
taskFaker.advanceUntil(0L)
assertThat(log).isEmpty()
redQueue.shutdown()
taskFaker.advanceUntil(99L)
assertThat(log).containsExactly("cancel@99")
taskFaker.assertNoMoreTasks()
}
@Test fun shutdownFailsToCancelsScheduledTasks() {
redQueue.schedule(object : Task("task") {
override fun runOnce(): Long {
log += "run@${taskFaker.nanoTime}"
return 50L
}
override fun tryCancel(): Boolean {
log += "cancel@${taskFaker.nanoTime}"
return false
}
}, 100L)
taskFaker.advanceUntil(0L)
assertThat(log).isEmpty()
redQueue.shutdown()
taskFaker.advanceUntil(99L)
assertThat(log).containsExactly("cancel@99")
taskFaker.advanceUntil(100L)
assertThat(log).containsExactly("cancel@99", "run@100")
taskFaker.assertNoMoreTasks()
}
@Test fun scheduleDiscardsTaskWhenShutdown() {
redQueue.shutdown()
redQueue.trySchedule(object : Task("task") {
override fun runOnce() = -1L
}, 100L)
taskFaker.assertNoMoreTasks()
}
@Test fun scheduleThrowsWhenShutdown() {
redQueue.shutdown()
try {
redQueue.schedule(object : Task("task") {
override fun runOnce() = -1L
}, 100L)
fail()
} catch (_: RejectedExecutionException) {
}
taskFaker.assertNoMoreTasks()
}
}

View File

@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import okhttp3.Headers;
import okhttp3.internal.Util;
import okhttp3.internal.concurrent.TaskRunner;
import okhttp3.internal.http2.MockHttp2Peer.InFrame;
import okio.AsyncTimeout;
import okio.Buffer;
@@ -498,7 +499,7 @@ public final class Http2ConnectionTest {
String longString = repeat('a', Http2.INITIAL_MAX_FRAME_SIZE + 1);
Socket socket = peer.openSocket();
Http2Connection connection = new Http2Connection.Builder(true)
Http2Connection connection = new Http2Connection.Builder(true, TaskRunner.INSTANCE)
.socket(socket)
.pushObserver(IGNORE)
.build();
@@ -1786,7 +1787,7 @@ public final class Http2ConnectionTest {
peer.acceptFrame(); // GOAWAY
peer.play();
Http2Connection connection = new Http2Connection.Builder(true)
Http2Connection connection = new Http2Connection.Builder(true, TaskRunner.INSTANCE)
.socket(peer.openSocket())
.build();
connection.start(false);
@@ -1857,7 +1858,7 @@ public final class Http2ConnectionTest {
/** Builds a new connection to {@code peer} with settings acked. */
private Http2Connection connect(MockHttp2Peer peer, PushObserver pushObserver,
Http2Connection.Listener listener) throws Exception {
Http2Connection connection = new Http2Connection.Builder(true)
Http2Connection connection = new Http2Connection.Builder(true, TaskRunner.INSTANCE)
.socket(peer.openSocket())
.pushObserver(pushObserver)
.listener(listener)