From 33db7d9ef67ebc876fac50aea7eee7d81ff4bc2f Mon Sep 17 00:00:00 2001 From: Jesse Wilson Date: Sat, 4 May 2019 20:49:46 -0400 Subject: [PATCH] Convert Http2Stream to Kotlin (#5009) * Rename Http2Stream.java to .kt * Convert Http2Stream to Kotlin --- .../internal/http2/Http2Connection.java | 4 +- .../okhttp3/internal/http2/Http2Stream.java | 673 ------------------ .../okhttp3/internal/http2/Http2Stream.kt | 652 +++++++++++++++++ .../internal/http2/Http2ConnectionTest.java | 14 +- 4 files changed, 662 insertions(+), 681 deletions(-) delete mode 100644 okhttp/src/main/java/okhttp3/internal/http2/Http2Stream.java create mode 100644 okhttp/src/main/java/okhttp3/internal/http2/Http2Stream.kt diff --git a/okhttp/src/main/java/okhttp3/internal/http2/Http2Connection.java b/okhttp/src/main/java/okhttp3/internal/http2/Http2Connection.java index 69a47eca3..1ee08c2a0 100644 --- a/okhttp/src/main/java/okhttp3/internal/http2/Http2Connection.java +++ b/okhttp/src/main/java/okhttp3/internal/http2/Http2Connection.java @@ -245,7 +245,9 @@ public final class Http2Connection implements Closeable { streamId = nextStreamId; nextStreamId += 2; stream = new Http2Stream(streamId, this, outFinished, inFinished, null); - flushHeaders = !out || bytesLeftInWriteWindow == 0L || stream.bytesLeftInWriteWindow == 0L; + flushHeaders = !out + || bytesLeftInWriteWindow == 0L + || stream.getBytesLeftInWriteWindow() == 0L; if (stream.isOpen()) { streams.put(streamId, stream); } diff --git a/okhttp/src/main/java/okhttp3/internal/http2/Http2Stream.java b/okhttp/src/main/java/okhttp3/internal/http2/Http2Stream.java deleted file mode 100644 index c8049e9ac..000000000 --- a/okhttp/src/main/java/okhttp3/internal/http2/Http2Stream.java +++ /dev/null @@ -1,673 +0,0 @@ -/* - * Copyright (C) 2011 The Android Open Source Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package okhttp3.internal.http2; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.net.SocketTimeoutException; -import java.util.ArrayDeque; -import java.util.Deque; -import java.util.List; -import javax.annotation.Nullable; -import okhttp3.Headers; -import okhttp3.internal.Util; -import okio.AsyncTimeout; -import okio.Buffer; -import okio.BufferedSource; -import okio.Sink; -import okio.Source; -import okio.Timeout; - -/** A logical bidirectional stream. */ -public final class Http2Stream { - // Internal state is guarded by this. No long-running or potentially - // blocking operations are performed while the lock is held. - - /** - * The total number of bytes consumed by the application (with {@link FramingSource#read}), but - * not yet acknowledged by sending a {@code WINDOW_UPDATE} frame on this stream. - */ - // Visible for testing - long unacknowledgedBytesRead = 0; - - /** - * Count of bytes that can be written on the stream before receiving a window update. Even if this - * is positive, writes will block until there available bytes in {@code - * connection.bytesLeftInWriteWindow}. - */ - // guarded by this - long bytesLeftInWriteWindow; - - final int id; - final Http2Connection connection; - - /** - * Received headers yet to be {@linkplain #takeHeaders taken}, or {@linkplain FramingSource#read - * read}. - */ - private final Deque headersQueue = new ArrayDeque<>(); - - /** True if response headers have been sent or received. */ - private boolean hasResponseHeaders; - - private final FramingSource source; - final FramingSink sink; - final StreamTimeout readTimeout = new StreamTimeout(); - final StreamTimeout writeTimeout = new StreamTimeout(); - - /** - * The reason why this stream was abnormally closed. If there are multiple reasons to abnormally - * close this stream (such as both peers closing it near-simultaneously) then this is the first - * reason known to this peer. - */ - @Nullable ErrorCode errorCode; - - /** The exception that explains {@code errorCode}. Null if no exception was provided. */ - @Nullable IOException errorException; - - Http2Stream(int id, Http2Connection connection, boolean outFinished, boolean inFinished, - @Nullable Headers headers) { - if (connection == null) throw new NullPointerException("connection == null"); - - this.id = id; - this.connection = connection; - this.bytesLeftInWriteWindow = - connection.peerSettings.getInitialWindowSize(); - this.source = new FramingSource(connection.okHttpSettings.getInitialWindowSize()); - this.sink = new FramingSink(); - this.source.finished = inFinished; - this.sink.finished = outFinished; - if (headers != null) { - headersQueue.add(headers); - } - - if (isLocallyInitiated() && headers != null) { - throw new IllegalStateException("locally-initiated streams shouldn't have headers yet"); - } else if (!isLocallyInitiated() && headers == null) { - throw new IllegalStateException("remotely-initiated streams should have headers"); - } - } - - public int getId() { - return id; - } - - /** - * Returns true if this stream is open. A stream is open until either: - * - * - * - *

Note that the input stream may continue to yield data even after a stream reports itself as - * not open. This is because input data is buffered. - */ - public synchronized boolean isOpen() { - if (errorCode != null) { - return false; - } - if ((source.finished || source.closed) - && (sink.finished || sink.closed) - && hasResponseHeaders) { - return false; - } - return true; - } - - /** Returns true if this stream was created by this peer. */ - public boolean isLocallyInitiated() { - boolean streamIsClient = ((id & 1) == 1); - return connection.client == streamIsClient; - } - - public Http2Connection getConnection() { - return connection; - } - - /** - * Removes and returns the stream's received response headers, blocking if necessary until headers - * have been received. If the returned list contains multiple blocks of headers the blocks will be - * delimited by 'null'. - */ - public synchronized Headers takeHeaders() throws IOException { - readTimeout.enter(); - try { - while (headersQueue.isEmpty() && errorCode == null) { - waitForIo(); - } - } finally { - readTimeout.exitAndThrowIfTimedOut(); - } - if (!headersQueue.isEmpty()) { - return headersQueue.removeFirst(); - } - throw errorException != null ? errorException : new StreamResetException(errorCode); - } - - /** - * Returns the trailers. It is only safe to call this once the source stream has been completely - * exhausted. - */ - public synchronized Headers trailers() throws IOException { - if (errorCode != null) { - throw errorException != null ? errorException : new StreamResetException(errorCode); - } - if (!source.finished || !source.receiveBuffer.exhausted() || !source.readBuffer.exhausted()) { - throw new IllegalStateException("too early; can't read the trailers yet"); - } - return source.trailers != null ? source.trailers : Util.EMPTY_HEADERS; - } - - /** - * Returns the reason why this stream was closed, or null if it closed normally or has not yet - * been closed. - */ - public synchronized ErrorCode getErrorCode() { - return errorCode; - } - - /** - * Sends a reply to an incoming stream. - * - * @param outFinished true to eagerly finish the output stream to send data to the remote peer. - * Corresponds to {@code FLAG_FIN}. - * @param flushHeaders true to force flush the response headers. This should be true unless the - * response body exists and will be written immediately. - */ - public void writeHeaders(List

responseHeaders, boolean outFinished, boolean flushHeaders) - throws IOException { - assert (!Thread.holdsLock(Http2Stream.this)); - if (responseHeaders == null) { - throw new NullPointerException("headers == null"); - } - synchronized (this) { - this.hasResponseHeaders = true; - if (outFinished) { - this.sink.finished = true; - } - } - - // Only DATA frames are subject to flow-control. Transmit the HEADER frame if the connection - // flow-control window is fully depleted. - if (!flushHeaders) { - synchronized (connection) { - flushHeaders = connection.bytesLeftInWriteWindow == 0L; - } - } - - connection.writeHeaders(id, outFinished, responseHeaders); - - if (flushHeaders) { - connection.flush(); - } - } - - public void enqueueTrailers(Headers trailers) { - synchronized (this) { - if (sink.finished) throw new IllegalStateException("already finished"); - if (trailers.size() == 0) throw new IllegalArgumentException("trailers.size() == 0"); - this.sink.trailers = trailers; - } - } - - public Timeout readTimeout() { - return readTimeout; - } - - public Timeout writeTimeout() { - return writeTimeout; - } - - /** Returns a source that reads data from the peer. */ - public Source getSource() { - return source; - } - - /** - * Returns a sink that can be used to write data to the peer. - * - * @throws IllegalStateException if this stream was initiated by the peer and a {@link - * #writeHeaders} has not yet been sent. - */ - public Sink getSink() { - synchronized (this) { - if (!hasResponseHeaders && !isLocallyInitiated()) { - throw new IllegalStateException("reply before requesting the sink"); - } - } - return sink; - } - - /** - * Abnormally terminate this stream. This blocks until the {@code RST_STREAM} frame has been - * transmitted. - */ - public void close(ErrorCode rstStatusCode, @Nullable IOException errorException) - throws IOException { - if (!closeInternal(rstStatusCode, errorException)) { - return; // Already closed. - } - connection.writeSynReset(id, rstStatusCode); - } - - /** - * Abnormally terminate this stream. This enqueues a {@code RST_STREAM} frame and returns - * immediately. - */ - public void closeLater(ErrorCode errorCode) { - if (!closeInternal(errorCode, null)) { - return; // Already closed. - } - connection.writeSynResetLater(id, errorCode); - } - - /** Returns true if this stream was closed. */ - private boolean closeInternal(ErrorCode errorCode, @Nullable IOException errorException) { - assert (!Thread.holdsLock(this)); - synchronized (this) { - if (this.errorCode != null) { - return false; - } - if (source.finished && sink.finished) { - return false; - } - this.errorCode = errorCode; - this.errorException = errorException; - notifyAll(); - } - connection.removeStream(id); - return true; - } - - void receiveData(BufferedSource in, int length) throws IOException { - assert (!Thread.holdsLock(Http2Stream.this)); - this.source.receive(in, length); - } - - /** - * Accept headers from the network and store them until the client calls {@link #takeHeaders}, or - * {@link FramingSource#read} them. - */ - void receiveHeaders(Headers headers, boolean inFinished) { - assert (!Thread.holdsLock(Http2Stream.this)); - boolean open; - synchronized (this) { - if (!hasResponseHeaders || !inFinished) { - hasResponseHeaders = true; - headersQueue.add(headers); - } else { - this.source.trailers = headers; - } - if (inFinished) { - this.source.finished = true; - } - open = isOpen(); - notifyAll(); - } - if (!open) { - connection.removeStream(id); - } - } - - synchronized void receiveRstStream(ErrorCode errorCode) { - if (this.errorCode == null) { - this.errorCode = errorCode; - notifyAll(); - } - } - - /** - * A source that reads the incoming data frames of a stream. Although this class uses - * synchronization to safely receive incoming data frames, it is not intended for use by multiple - * readers. - */ - private final class FramingSource implements Source { - /** Buffer to receive data from the network into. Only accessed by the reader thread. */ - private final Buffer receiveBuffer = new Buffer(); - - /** Buffer with readable data. Guarded by Http2Stream.this. */ - private final Buffer readBuffer = new Buffer(); - - /** Maximum number of bytes to buffer before reporting a flow control error. */ - private final long maxByteCount; - - /** - * Received trailers. Null unless the server has provided trailers. Undefined until the stream - * is exhausted. Guarded by Http2Stream.this. - */ - private Headers trailers; - - /** True if the caller has closed this stream. */ - boolean closed; - - /** - * True if either side has cleanly shut down this stream. We will receive no more bytes beyond - * those already in the buffer. - */ - boolean finished; - - FramingSource(long maxByteCount) { - this.maxByteCount = maxByteCount; - } - - @Override public long read(Buffer sink, long byteCount) throws IOException { - if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount); - - while (true) { - long readBytesDelivered = -1; - IOException errorExceptionToDeliver = null; - - // 1. Decide what to do in a synchronized block. - - synchronized (Http2Stream.this) { - readTimeout.enter(); - try { - if (errorCode != null) { - // Prepare to deliver an error. - errorExceptionToDeliver = errorException != null - ? errorException - : new StreamResetException(errorCode); - } - - if (closed) { - throw new IOException("stream closed"); - - } else if (readBuffer.size() > 0) { - // Prepare to read bytes. Start by moving them to the caller's buffer. - readBytesDelivered = readBuffer.read(sink, Math.min(byteCount, readBuffer.size())); - unacknowledgedBytesRead += readBytesDelivered; - - if (errorExceptionToDeliver == null - && unacknowledgedBytesRead - >= connection.okHttpSettings.getInitialWindowSize() / 2) { - // Flow control: notify the peer that we're ready for more data! Only send a - // WINDOW_UPDATE if the stream isn't in error. - connection.writeWindowUpdateLater(id, unacknowledgedBytesRead); - unacknowledgedBytesRead = 0; - } - } else if (!finished && errorExceptionToDeliver == null) { - // Nothing to do. Wait until that changes then try again. - waitForIo(); - continue; - } - } finally { - readTimeout.exitAndThrowIfTimedOut(); - } - } - - // 2. Do it outside of the synchronized block and timeout. - - if (readBytesDelivered != -1) { - // Update connection.unacknowledgedBytesRead outside the synchronized block. - updateConnectionFlowControl(readBytesDelivered); - return readBytesDelivered; - } - - if (errorExceptionToDeliver != null) { - // We defer throwing the exception until now so that we can refill the connection - // flow-control window. This is necessary because we don't transmit window updates until - // the application reads the data. If we throw this prior to updating the connection - // flow-control window, we risk having it go to 0 preventing the server from sending data. - throw errorExceptionToDeliver; - } - - return -1; // This source is exhausted. - } - } - - private void updateConnectionFlowControl(long read) { - assert (!Thread.holdsLock(Http2Stream.this)); - connection.updateConnectionFlowControl(read); - } - - void receive(BufferedSource in, long byteCount) throws IOException { - assert (!Thread.holdsLock(Http2Stream.this)); - - while (byteCount > 0) { - boolean finished; - boolean flowControlError; - synchronized (Http2Stream.this) { - finished = this.finished; - flowControlError = byteCount + readBuffer.size() > maxByteCount; - } - - // If the peer sends more data than we can handle, discard it and close the connection. - if (flowControlError) { - in.skip(byteCount); - closeLater(ErrorCode.FLOW_CONTROL_ERROR); - return; - } - - // Discard data received after the stream is finished. It's probably a benign race. - if (finished) { - in.skip(byteCount); - return; - } - - // Fill the receive buffer without holding any locks. - long read = in.read(receiveBuffer, byteCount); - if (read == -1) throw new EOFException(); - byteCount -= read; - - // Move the received data to the read buffer to the reader can read it. - synchronized (Http2Stream.this) { - boolean wasEmpty = readBuffer.size() == 0; - readBuffer.writeAll(receiveBuffer); - if (wasEmpty) { - Http2Stream.this.notifyAll(); - } - } - } - } - - @Override public Timeout timeout() { - return readTimeout; - } - - @Override public void close() throws IOException { - long bytesDiscarded; - synchronized (Http2Stream.this) { - closed = true; - bytesDiscarded = readBuffer.size(); - readBuffer.clear(); - Http2Stream.this.notifyAll(); // TODO(jwilson): Unnecessary? - } - if (bytesDiscarded > 0) { - updateConnectionFlowControl(bytesDiscarded); - } - cancelStreamIfNecessary(); - } - } - - void cancelStreamIfNecessary() throws IOException { - assert (!Thread.holdsLock(Http2Stream.this)); - boolean open; - boolean cancel; - synchronized (this) { - cancel = !source.finished && source.closed && (sink.finished || sink.closed); - open = isOpen(); - } - if (cancel) { - // RST this stream to prevent additional data from being sent. This - // is safe because the input stream is closed (we won't use any - // further bytes) and the output stream is either finished or closed - // (so RSTing both streams doesn't cause harm). - Http2Stream.this.close(ErrorCode.CANCEL, null); - } else if (!open) { - connection.removeStream(id); - } - } - - /** A sink that writes outgoing data frames of a stream. This class is not thread safe. */ - final class FramingSink implements Sink { - private static final long EMIT_BUFFER_SIZE = 16384; - - /** - * Buffer of outgoing data. This batches writes of small writes into this sink as larges frames - * written to the outgoing connection. Batching saves the (small) framing overhead. - */ - private final Buffer sendBuffer = new Buffer(); - - /** Trailers to send at the end of the stream. */ - private Headers trailers; - - boolean closed; - - /** - * True if either side has cleanly shut down this stream. We shall send no more bytes. - */ - boolean finished; - - @Override public void write(Buffer source, long byteCount) throws IOException { - assert (!Thread.holdsLock(Http2Stream.this)); - sendBuffer.write(source, byteCount); - while (sendBuffer.size() >= EMIT_BUFFER_SIZE) { - emitFrame(false); - } - } - - /** - * Emit a single data frame to the connection. The frame's size be limited by this stream's - * write window. This method will block until the write window is nonempty. - */ - private void emitFrame(boolean outFinishedOnLastFrame) throws IOException { - long toWrite; - synchronized (Http2Stream.this) { - writeTimeout.enter(); - try { - while (bytesLeftInWriteWindow <= 0 && !finished && !closed && errorCode == null) { - waitForIo(); // Wait until we receive a WINDOW_UPDATE for this stream. - } - } finally { - writeTimeout.exitAndThrowIfTimedOut(); - } - - checkOutNotClosed(); // Kick out if the stream was reset or closed while waiting. - toWrite = Math.min(bytesLeftInWriteWindow, sendBuffer.size()); - bytesLeftInWriteWindow -= toWrite; - } - - writeTimeout.enter(); - try { - boolean outFinished = outFinishedOnLastFrame && toWrite == sendBuffer.size(); - connection.writeData(id, outFinished, sendBuffer, toWrite); - } finally { - writeTimeout.exitAndThrowIfTimedOut(); - } - } - - @Override public void flush() throws IOException { - assert (!Thread.holdsLock(Http2Stream.this)); - synchronized (Http2Stream.this) { - checkOutNotClosed(); - } - while (sendBuffer.size() > 0) { - emitFrame(false); - connection.flush(); - } - } - - @Override public Timeout timeout() { - return writeTimeout; - } - - @Override public void close() throws IOException { - assert (!Thread.holdsLock(Http2Stream.this)); - synchronized (Http2Stream.this) { - if (closed) return; - } - if (!sink.finished) { - // We have 0 or more frames of data, and 0 or more frames of trailers. We need to send at - // least one frame with the END_STREAM flag set. That must be the last frame, and the - // trailers must be sent after all of the data. - boolean hasData = sendBuffer.size() > 0; - boolean hasTrailers = trailers != null; - if (hasTrailers) { - while (sendBuffer.size() > 0) { - emitFrame(false); - } - connection.writeHeaders(id, true, Util.toHeaderBlock(trailers)); - } else if (hasData) { - while (sendBuffer.size() > 0) { - emitFrame(true); - } - } else { - connection.writeData(id, true, null, 0); - } - } - synchronized (Http2Stream.this) { - closed = true; - } - connection.flush(); - cancelStreamIfNecessary(); - } - } - - /** - * {@code delta} will be negative if a settings frame initial window is smaller than the last. - */ - void addBytesToWriteWindow(long delta) { - bytesLeftInWriteWindow += delta; - if (delta > 0) Http2Stream.this.notifyAll(); - } - - void checkOutNotClosed() throws IOException { - if (sink.closed) { - throw new IOException("stream closed"); - } else if (sink.finished) { - throw new IOException("stream finished"); - } else if (errorCode != null) { - throw errorException != null ? errorException : new StreamResetException(errorCode); - } - } - - /** - * Like {@link #wait}, but throws an {@code InterruptedIOException} when interrupted instead of - * the more awkward {@link InterruptedException}. - */ - void waitForIo() throws InterruptedIOException { - try { - wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); // Retain interrupted status. - throw new InterruptedIOException(); - } - } - - /** - * The Okio timeout watchdog will call {@link #timedOut} if the timeout is reached. In that case - * we close the stream (asynchronously) which will notify the waiting thread. - */ - class StreamTimeout extends AsyncTimeout { - @Override protected void timedOut() { - closeLater(ErrorCode.CANCEL); - } - - @Override protected IOException newTimeoutException(IOException cause) { - SocketTimeoutException socketTimeoutException = new SocketTimeoutException("timeout"); - if (cause != null) { - socketTimeoutException.initCause(cause); - } - return socketTimeoutException; - } - - public void exitAndThrowIfTimedOut() throws IOException { - if (exit()) throw newTimeoutException(null /* cause */); - } - } -} diff --git a/okhttp/src/main/java/okhttp3/internal/http2/Http2Stream.kt b/okhttp/src/main/java/okhttp3/internal/http2/Http2Stream.kt new file mode 100644 index 000000000..fbefa17d0 --- /dev/null +++ b/okhttp/src/main/java/okhttp3/internal/http2/Http2Stream.kt @@ -0,0 +1,652 @@ +/* + * Copyright (C) 2011 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3.internal.http2 + +import okhttp3.Headers +import okhttp3.internal.Util +import okhttp3.internal.wait +import okio.AsyncTimeout +import okio.Buffer +import okio.BufferedSource +import okio.Sink +import okio.Source +import okio.Timeout +import java.io.EOFException +import java.io.IOException +import java.io.InterruptedIOException +import java.net.SocketTimeoutException +import java.util.ArrayDeque + +/** A logical bidirectional stream. */ +class Http2Stream internal constructor( + val id: Int, + val connection: Http2Connection, + outFinished: Boolean, + inFinished: Boolean, + headers: Headers? +) { + // Internal state is guarded by this. No long-running or potentially blocking operations are + // performed while the lock is held. + + /** + * The total number of bytes consumed by the application (with [FramingSource.read]), but + * not yet acknowledged by sending a `WINDOW_UPDATE` frame on this stream. + */ + var unacknowledgedBytesRead = 0L + internal set + + /** + * Count of bytes that can be written on the stream before receiving a window update. Even if this + * is positive, writes will block until there available bytes in + * [Http2Connection.bytesLeftInWriteWindow]. + */ + // guarded by this + var bytesLeftInWriteWindow: Long = connection.peerSettings.initialWindowSize.toLong() + internal set + + /** Received headers yet to be [taken][takeHeaders], or [read][FramingSource.read]. */ + private val headersQueue = ArrayDeque() + + /** True if response headers have been sent or received. */ + private var hasResponseHeaders: Boolean = false + + internal val source = FramingSource( + maxByteCount = connection.okHttpSettings.initialWindowSize.toLong(), + finished = inFinished + ) + internal val sink = FramingSink( + finished = outFinished + ) + internal val readTimeout = StreamTimeout() + internal val writeTimeout = StreamTimeout() + + /** + * The reason why this stream was closed, or null if it closed normally or has not yet been + * closed. + * + * If there are multiple reasons to abnormally close this stream (such as both peers closing it + * near-simultaneously) then this is the first reason known to this peer. + */ + @get:Synchronized internal var errorCode: ErrorCode? = null + + /** The exception that explains [errorCode]. Null if no exception was provided. */ + internal var errorException: IOException? = null + + init { + if (headers != null) { + check(!isLocallyInitiated) { "locally-initiated streams shouldn't have headers yet" } + headersQueue += headers + } else { + check(isLocallyInitiated) { "remotely-initiated streams should have headers" } + } + } + + /** + * Returns true if this stream is open. A stream is open until either: + * + * * A `SYN_RESET` frame abnormally terminates the stream. + * * Both input and output streams have transmitted all data and headers. + * + * Note that the input stream may continue to yield data even after a stream reports itself as + * not open. This is because input data is buffered. + */ + val isOpen: Boolean + @Synchronized get() { + if (errorCode != null) { + return false + } + if ((source.finished || source.closed) && + (sink.finished || sink.closed) && + hasResponseHeaders) { + return false + } + return true + } + + /** Returns true if this stream was created by this peer. */ + val isLocallyInitiated: Boolean + get() { + val streamIsClient = (id and 1) == 1 + return connection.client == streamIsClient + } + + /** + * Removes and returns the stream's received response headers, blocking if necessary until headers + * have been received. If the returned list contains multiple blocks of headers the blocks will be + * delimited by 'null'. + */ + @Synchronized @Throws(IOException::class) + fun takeHeaders(): Headers { + readTimeout.enter() + try { + while (headersQueue.isEmpty() && errorCode == null) { + waitForIo() + } + } finally { + readTimeout.exitAndThrowIfTimedOut() + } + if (headersQueue.isNotEmpty()) { + return headersQueue.removeFirst() + } + throw errorException ?: StreamResetException(errorCode!!) + } + + /** + * Returns the trailers. It is only safe to call this once the source stream has been completely + * exhausted. + */ + @Synchronized @Throws(IOException::class) + fun trailers(): Headers { + if (errorCode != null) { + throw errorException ?: StreamResetException(errorCode!!) + } + check(source.finished && source.receiveBuffer.exhausted() && source.readBuffer.exhausted()) { + "too early; can't read the trailers yet" + } + return source.trailers ?: Util.EMPTY_HEADERS + } + + /** + * Sends a reply to an incoming stream. + * + * @param outFinished true to eagerly finish the output stream to send data to the remote peer. + * Corresponds to `FLAG_FIN`. + * @param flushHeaders true to force flush the response headers. This should be true unless the + * response body exists and will be written immediately. + */ + @Throws(IOException::class) + fun writeHeaders(responseHeaders: List
, outFinished: Boolean, flushHeaders: Boolean) { + var flushHeaders = flushHeaders + assert(!Thread.holdsLock(this@Http2Stream)) + synchronized(this) { + this.hasResponseHeaders = true + if (outFinished) { + this.sink.finished = true + } + } + + // Only DATA frames are subject to flow-control. Transmit the HEADER frame if the connection + // flow-control window is fully depleted. + if (!flushHeaders) { + synchronized(connection) { + flushHeaders = connection.bytesLeftInWriteWindow == 0L + } + } + + connection.writeHeaders(id, outFinished, responseHeaders) + + if (flushHeaders) { + connection.flush() + } + } + + fun enqueueTrailers(trailers: Headers) { + synchronized(this) { + check(!sink.finished) { "already finished" } + require(trailers.size() != 0) { "trailers.size() == 0" } + this.sink.trailers = trailers + } + } + + fun readTimeout(): Timeout = readTimeout + + fun writeTimeout(): Timeout = writeTimeout + + /** Returns a source that reads data from the peer. */ + fun getSource(): Source = source + + /** + * Returns a sink that can be used to write data to the peer. + * + * @throws IllegalStateException if this stream was initiated by the peer and a [writeHeaders] has + * not yet been sent. + */ + fun getSink(): Sink { + synchronized(this) { + check(hasResponseHeaders || isLocallyInitiated) { + "reply before requesting the sink" + } + } + return sink + } + + /** + * Abnormally terminate this stream. This blocks until the `RST_STREAM` frame has been + * transmitted. + */ + @Throws(IOException::class) + fun close(rstStatusCode: ErrorCode, errorException: IOException?) { + if (!closeInternal(rstStatusCode, errorException)) { + return // Already closed. + } + connection.writeSynReset(id, rstStatusCode) + } + + /** + * Abnormally terminate this stream. This enqueues a `RST_STREAM` frame and returns immediately. + */ + fun closeLater(errorCode: ErrorCode) { + if (!closeInternal(errorCode, null)) { + return // Already closed. + } + connection.writeSynResetLater(id, errorCode) + } + + /** Returns true if this stream was closed. */ + private fun closeInternal(errorCode: ErrorCode, errorException: IOException?): Boolean { + assert(!Thread.holdsLock(this)) + synchronized(this) { + if (this.errorCode != null) { + return false + } + if (source.finished && sink.finished) { + return false + } + this.errorCode = errorCode + this.errorException = errorException + (this as Object).notifyAll() + } + connection.removeStream(id) + return true + } + + @Throws(IOException::class) + fun receiveData(source: BufferedSource, length: Int) { + assert(!Thread.holdsLock(this@Http2Stream)) + this.source.receive(source, length.toLong()) + } + + /** + * Accept headers from the network and store them until the client calls [takeHeaders], or + * [FramingSource.read] them. + */ + fun receiveHeaders(headers: Headers, inFinished: Boolean) { + assert(!Thread.holdsLock(this@Http2Stream)) + val open: Boolean + synchronized(this) { + if (!hasResponseHeaders || !inFinished) { + hasResponseHeaders = true + headersQueue += headers + } else { + this.source.trailers = headers + } + if (inFinished) { + this.source.finished = true + } + open = isOpen + (this as Object).notifyAll() + } + if (!open) { + connection.removeStream(id) + } + } + + @Synchronized fun receiveRstStream(errorCode: ErrorCode) { + if (this.errorCode == null) { + this.errorCode = errorCode + (this as Object).notifyAll() + } + } + + /** + * A source that reads the incoming data frames of a stream. Although this class uses + * synchronization to safely receive incoming data frames, it is not intended for use by multiple + * readers. + */ + inner class FramingSource internal constructor( + /** Maximum number of bytes to buffer before reporting a flow control error. */ + private val maxByteCount: Long, + + /** + * True if either side has cleanly shut down this stream. We will receive no more bytes beyond + * those already in the buffer. + */ + internal var finished: Boolean + ) : Source { + /** Buffer to receive data from the network into. Only accessed by the reader thread. */ + val receiveBuffer = Buffer() + + /** Buffer with readable data. Guarded by Http2Stream.this. */ + val readBuffer = Buffer() + + /** + * Received trailers. Null unless the server has provided trailers. Undefined until the stream + * is exhausted. Guarded by Http2Stream.this. + */ + var trailers: Headers? = null + + /** True if the caller has closed this stream. */ + internal var closed: Boolean = false + + @Throws(IOException::class) + override fun read(sink: Buffer, byteCount: Long): Long { + require(byteCount >= 0L) { "byteCount < 0: $byteCount" } + + while (true) { + var tryAgain = false + var readBytesDelivered = -1L + var errorExceptionToDeliver: IOException? = null + + // 1. Decide what to do in a synchronized block. + + synchronized(this@Http2Stream) { + readTimeout.enter() + try { + if (errorCode != null) { + // Prepare to deliver an error. + errorExceptionToDeliver = errorException ?: StreamResetException(errorCode!!) + } + + if (closed) { + throw IOException("stream closed") + } else if (readBuffer.size > 0) { + // Prepare to read bytes. Start by moving them to the caller's buffer. + readBytesDelivered = readBuffer.read(sink, minOf(byteCount, readBuffer.size)) + unacknowledgedBytesRead += readBytesDelivered + + if (errorExceptionToDeliver == null && + unacknowledgedBytesRead >= connection.okHttpSettings.initialWindowSize / 2) { + // Flow control: notify the peer that we're ready for more data! Only send a + // WINDOW_UPDATE if the stream isn't in error. + connection.writeWindowUpdateLater(id, unacknowledgedBytesRead) + unacknowledgedBytesRead = 0 + } + } else if (!finished && errorExceptionToDeliver == null) { + // Nothing to do. Wait until that changes then try again. + waitForIo() + tryAgain = true + } + } finally { + readTimeout.exitAndThrowIfTimedOut() + } + } + + // 2. Do it outside of the synchronized block and timeout. + + if (tryAgain) { + continue + } + + if (readBytesDelivered != -1L) { + // Update connection.unacknowledgedBytesRead outside the synchronized block. + updateConnectionFlowControl(readBytesDelivered) + return readBytesDelivered + } + + if (errorExceptionToDeliver != null) { + // We defer throwing the exception until now so that we can refill the connection + // flow-control window. This is necessary because we don't transmit window updates until + // the application reads the data. If we throw this prior to updating the connection + // flow-control window, we risk having it go to 0 preventing the server from sending data. + throw errorExceptionToDeliver!! + } + + return -1L // This source is exhausted. + } + } + + private fun updateConnectionFlowControl(read: Long) { + assert(!Thread.holdsLock(this@Http2Stream)) + connection.updateConnectionFlowControl(read) + } + + @Throws(IOException::class) + internal fun receive(source: BufferedSource, byteCount: Long) { + var byteCount = byteCount + assert(!Thread.holdsLock(this@Http2Stream)) + + while (byteCount > 0L) { + val finished: Boolean + val flowControlError: Boolean + synchronized(this@Http2Stream) { + finished = this.finished + flowControlError = byteCount + readBuffer.size > maxByteCount + } + + // If the peer sends more data than we can handle, discard it and close the connection. + if (flowControlError) { + source.skip(byteCount) + closeLater(ErrorCode.FLOW_CONTROL_ERROR) + return + } + + // Discard data received after the stream is finished. It's probably a benign race. + if (finished) { + source.skip(byteCount) + return + } + + // Fill the receive buffer without holding any locks. + val read = source.read(receiveBuffer, byteCount) + if (read == -1L) throw EOFException() + byteCount -= read + + // Move the received data to the read buffer to the reader can read it. + synchronized(this@Http2Stream) { + val wasEmpty = readBuffer.size == 0L + readBuffer.writeAll(receiveBuffer) + if (wasEmpty) { + (this@Http2Stream as Object).notifyAll() + } + } + } + } + + override fun timeout(): Timeout = readTimeout + + @Throws(IOException::class) + override fun close() { + val bytesDiscarded: Long + synchronized(this@Http2Stream) { + closed = true + bytesDiscarded = readBuffer.size + readBuffer.clear() + (this@Http2Stream as Object).notifyAll() // TODO(jwilson): Unnecessary? + } + if (bytesDiscarded > 0L) { + updateConnectionFlowControl(bytesDiscarded) + } + cancelStreamIfNecessary() + } + } + + @Throws(IOException::class) + internal fun cancelStreamIfNecessary() { + assert(!Thread.holdsLock(this@Http2Stream)) + val open: Boolean + val cancel: Boolean + synchronized(this) { + cancel = !source.finished && source.closed && (sink.finished || sink.closed) + open = isOpen + } + if (cancel) { + // RST this stream to prevent additional data from being sent. This is safe because the input + // stream is closed (we won't use any further bytes) and the output stream is either finished + // or closed (so RSTing both streams doesn't cause harm). + this@Http2Stream.close(ErrorCode.CANCEL, null) + } else if (!open) { + connection.removeStream(id) + } + } + + /** A sink that writes outgoing data frames of a stream. This class is not thread safe. */ + internal inner class FramingSink( + /** True if either side has cleanly shut down this stream. We shall send no more bytes. */ + var finished: Boolean = false + ) : Sink { + + /** + * Buffer of outgoing data. This batches writes of small writes into this sink as larges frames + * written to the outgoing connection. Batching saves the (small) framing overhead. + */ + private val sendBuffer = Buffer() + + /** Trailers to send at the end of the stream. */ + var trailers: Headers? = null + + var closed: Boolean = false + + @Throws(IOException::class) + override fun write(source: Buffer, byteCount: Long) { + assert(!Thread.holdsLock(this@Http2Stream)) + sendBuffer.write(source, byteCount) + while (sendBuffer.size >= EMIT_BUFFER_SIZE) { + emitFrame(false) + } + } + + /** + * Emit a single data frame to the connection. The frame's size be limited by this stream's + * write window. This method will block until the write window is nonempty. + */ + @Throws(IOException::class) + private fun emitFrame(outFinishedOnLastFrame: Boolean) { + val toWrite: Long + synchronized(this@Http2Stream) { + writeTimeout.enter() + try { + while (bytesLeftInWriteWindow <= 0L && !finished && !closed && errorCode == null) { + waitForIo() // Wait until we receive a WINDOW_UPDATE for this stream. + } + } finally { + writeTimeout.exitAndThrowIfTimedOut() + } + + checkOutNotClosed() // Kick out if the stream was reset or closed while waiting. + toWrite = minOf(bytesLeftInWriteWindow, sendBuffer.size) + bytesLeftInWriteWindow -= toWrite + } + + writeTimeout.enter() + try { + val outFinished = outFinishedOnLastFrame && toWrite == sendBuffer.size + connection.writeData(id, outFinished, sendBuffer, toWrite) + } finally { + writeTimeout.exitAndThrowIfTimedOut() + } + } + + @Throws(IOException::class) + override fun flush() { + assert(!Thread.holdsLock(this@Http2Stream)) + synchronized(this@Http2Stream) { + checkOutNotClosed() + } + while (sendBuffer.size > 0L) { + emitFrame(false) + connection.flush() + } + } + + override fun timeout(): Timeout = writeTimeout + + @Throws(IOException::class) + override fun close() { + assert(!Thread.holdsLock(this@Http2Stream)) + synchronized(this@Http2Stream) { + if (closed) return + } + if (!sink.finished) { + // We have 0 or more frames of data, and 0 or more frames of trailers. We need to send at + // least one frame with the END_STREAM flag set. That must be the last frame, and the + // trailers must be sent after all of the data. + val hasData = sendBuffer.size > 0L + val hasTrailers = trailers != null + when { + hasTrailers -> { + while (sendBuffer.size > 0L) { + emitFrame(false) + } + connection.writeHeaders(id, true, Util.toHeaderBlock(trailers!!)) + } + + hasData -> { + while (sendBuffer.size > 0L) { + emitFrame(true) + } + } + + else -> { + connection.writeData(id, true, null, 0L) + } + } + } + synchronized(this@Http2Stream) { + closed = true + } + connection.flush() + cancelStreamIfNecessary() + } + } + + companion object { + internal const val EMIT_BUFFER_SIZE = 16384L + } + + /** [delta] will be negative if a settings frame initial window is smaller than the last. */ + fun addBytesToWriteWindow(delta: Long) { + bytesLeftInWriteWindow += delta + if (delta > 0L) { + (this@Http2Stream as Object).notifyAll() + } + } + + @Throws(IOException::class) + internal fun checkOutNotClosed() { + when { + sink.closed -> throw IOException("stream closed") + sink.finished -> throw IOException("stream finished") + errorCode != null -> throw errorException ?: StreamResetException(errorCode!!) + } + } + + /** + * Like [Object.wait], but throws an [InterruptedIOException] when interrupted instead of the more + * awkward [InterruptedException]. + */ + @Throws(InterruptedIOException::class) + internal fun waitForIo() { + try { + wait() + } catch (_: InterruptedException) { + Thread.currentThread().interrupt() // Retain interrupted status. + throw InterruptedIOException() + } + } + + /** + * The Okio timeout watchdog will call [timedOut] if the timeout is reached. In that case we close + * the stream (asynchronously) which will notify the waiting thread. + */ + internal inner class StreamTimeout : AsyncTimeout() { + override fun timedOut() { + closeLater(ErrorCode.CANCEL) + } + + override fun newTimeoutException(cause: IOException?): IOException { + return SocketTimeoutException("timeout").apply { + if (cause != null) { + initCause(cause) + } + } + } + + @Throws(IOException::class) + fun exitAndThrowIfTimedOut() { + if (exit()) throw newTimeoutException(null) + } + } +} diff --git a/okhttp/src/test/java/okhttp3/internal/http2/Http2ConnectionTest.java b/okhttp/src/test/java/okhttp3/internal/http2/Http2ConnectionTest.java index d0333c3e9..b48830cd8 100644 --- a/okhttp/src/test/java/okhttp3/internal/http2/Http2ConnectionTest.java +++ b/okhttp/src/test/java/okhttp3/internal/http2/Http2ConnectionTest.java @@ -121,7 +121,7 @@ public final class Http2ConnectionTest { assertThat(connection.peerSettings.getInitialWindowSize()).isEqualTo(3368); // New Stream is has the most recent initial window size. - assertThat(stream.bytesLeftInWriteWindow).isEqualTo(3368); + assertThat(stream.getBytesLeftInWriteWindow()).isEqualTo(3368); } @Test public void peerHttp2ServerZerosCompressionTable() throws Exception { @@ -312,7 +312,7 @@ public final class Http2ConnectionTest { Http2Connection connection = connect(peer); connection.okHttpSettings.set(INITIAL_WINDOW_SIZE, windowSize); Http2Stream stream = connection.newStream(headerEntries("b", "banana"), false); - assertThat(stream.unacknowledgedBytesRead).isEqualTo(0); + assertThat(stream.getUnacknowledgedBytesRead()).isEqualTo(0); assertThat(stream.takeHeaders()).isEqualTo(Headers.of("a", "android")); Source in = stream.getSource(); Buffer buffer = new Buffer(); @@ -1642,7 +1642,7 @@ public final class Http2ConnectionTest { Http2Connection connection = connect(peer); connection.okHttpSettings.set(INITIAL_WINDOW_SIZE, windowSize); Http2Stream stream = connection.newStream(headerEntries("b", "banana"), false); - assertThat(stream.unacknowledgedBytesRead).isEqualTo(0); + assertThat(stream.getUnacknowledgedBytesRead()).isEqualTo(0); assertThat(stream.takeHeaders()).isEqualTo(Headers.of("a", "android")); Source in = stream.getSource(); Buffer buffer = new Buffer(); @@ -1755,13 +1755,13 @@ public final class Http2ConnectionTest { // Check that we've filled the window for both the stream and also the connection. assertThat(connection.bytesLeftInWriteWindow).isEqualTo(0); - assertThat(connection.getStream(3).bytesLeftInWriteWindow).isEqualTo(0); + assertThat(connection.getStream(3).getBytesLeftInWriteWindow()).isEqualTo(0); // receiving a window update on the connection will unblock new streams. connection.readerRunnable.windowUpdate(0, 3); assertThat(connection.bytesLeftInWriteWindow).isEqualTo(3); - assertThat(connection.getStream(3).bytesLeftInWriteWindow).isEqualTo(0); + assertThat(connection.getStream(3).getBytesLeftInWriteWindow()).isEqualTo(0); // Another stream should be able to send data even though 1 is blocked. Http2Stream stream2 = connection.newStream(headerEntries("b", "banana"), true); @@ -1770,8 +1770,8 @@ public final class Http2ConnectionTest { out2.flush(); assertThat(connection.bytesLeftInWriteWindow).isEqualTo(0); - assertThat(connection.getStream(3).bytesLeftInWriteWindow).isEqualTo(0); - assertThat(connection.getStream(5).bytesLeftInWriteWindow).isEqualTo( + assertThat(connection.getStream(3).getBytesLeftInWriteWindow()).isEqualTo(0); + assertThat(connection.getStream(5).getBytesLeftInWriteWindow()).isEqualTo( (long) (DEFAULT_INITIAL_WINDOW_SIZE - 3)); }