From 9b4e74f8c8ce6fff4281c2d7369fb25f6772a64e Mon Sep 17 00:00:00 2001 From: jwilson Date: Sat, 18 Jun 2016 14:11:20 -0400 Subject: [PATCH] A Pipe. This is something we need to implement HttpURLConnection atop the native OkHttp API. The application thread wants to be active, writing to its OutputStream. But in OkHttp's API, this work wants to happen in the RequestBody.writeTo() callback. To make this work we give the HttpUrlConnection OutputStream the sink end of the pipe, and the RequestBody the source end of the pipe. It's a bit of plumbing, but the benefit is that backpressure, timeouts, and failures will propagate as expected. --- .../java/okhttp3/internal/io/PipeTest.java | 403 ++++++++++++++++++ .../internal/io/WaitUntilNotifiedTest.java | 147 +++++++ .../main/java/okhttp3/internal/io/Pipe.java | 182 ++++++++ 3 files changed, 732 insertions(+) create mode 100644 okhttp-tests/src/test/java/okhttp3/internal/io/PipeTest.java create mode 100644 okhttp-tests/src/test/java/okhttp3/internal/io/WaitUntilNotifiedTest.java create mode 100644 okhttp/src/main/java/okhttp3/internal/io/Pipe.java diff --git a/okhttp-tests/src/test/java/okhttp3/internal/io/PipeTest.java b/okhttp-tests/src/test/java/okhttp3/internal/io/PipeTest.java new file mode 100644 index 000000000..35bfbe40d --- /dev/null +++ b/okhttp-tests/src/test/java/okhttp3/internal/io/PipeTest.java @@ -0,0 +1,403 @@ +/* + * Copyright (C) 2016 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3.internal.io; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import okio.Buffer; +import okio.ByteString; +import okio.HashingSink; +import okio.Source; +import org.junit.After; +import org.junit.Test; + +import static junit.framework.TestCase.fail; +import static org.junit.Assert.assertEquals; + +public final class PipeTest { + final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2); + + @After public void tearDown() throws Exception { + executorService.shutdown(); + } + + @Test public void test() throws Exception { + Pipe pipe = new Pipe(6); + pipe.sink.write(new Buffer().writeUtf8("abc"), 3L); + + Source source = pipe.source; + Buffer readBuffer = new Buffer(); + assertEquals(3L, source.read(readBuffer, 6L)); + assertEquals("abc", readBuffer.readUtf8()); + + pipe.sink.close(); + assertEquals(-1L, source.read(readBuffer, 6L)); + + source.close(); + } + + /** + * A producer writes the first 16 MiB of bytes generated by {@code new Random(0)} to a sink, and a + * consumer consumes them. Both compute hashes of their data to confirm that they're as expected. + */ + @Test public void largeDataset() throws Exception { + final Pipe pipe = new Pipe(1000L); // An awkward size to force producer/consumer exchange. + final long totalBytes = 16L * 1024L * 1024L; + ByteString expectedHash = ByteString.decodeHex("7c3b224bea749086babe079360cf29f98d88262d"); + + // Write data to the sink. + Future sinkHash = executorService.submit(new Callable() { + @Override public ByteString call() throws Exception { + HashingSink hashingSink = HashingSink.sha1(pipe.sink); + Random random = new Random(0); + byte[] data = new byte[8192]; + + Buffer buffer = new Buffer(); + for (long i = 0L; i < totalBytes; i += data.length) { + random.nextBytes(data); + buffer.write(data); + hashingSink.write(buffer, buffer.size()); + } + + hashingSink.close(); + return hashingSink.hash(); + } + }); + + // Read data from the source. + Future sourceHash = executorService.submit(new Callable() { + @Override public ByteString call() throws Exception { + Buffer blackhole = new Buffer(); + HashingSink hashingSink = HashingSink.sha1(blackhole); + + Buffer buffer = new Buffer(); + while (pipe.source.read(buffer, Long.MAX_VALUE) != -1) { + hashingSink.write(buffer, buffer.size()); + blackhole.clear(); + } + + pipe.source.close(); + return hashingSink.hash(); + } + }); + + assertEquals(expectedHash, sinkHash.get()); + assertEquals(expectedHash, sourceHash.get()); + } + + @Test public void sinkTimeout() throws Exception { + Pipe pipe = new Pipe(3); + pipe.sink.timeout().timeout(250, TimeUnit.MILLISECONDS); + pipe.sink.write(new Buffer().writeUtf8("abc"), 3L); + double start = now(); + try { + pipe.sink.write(new Buffer().writeUtf8("def"), 3L); + fail(); + } catch (InterruptedIOException expected) { + assertEquals("timeout", expected.getMessage()); + } + assertElapsed(250.0, start); + + Buffer readBuffer = new Buffer(); + assertEquals(3L, pipe.source.read(readBuffer, 6L)); + assertEquals("abc", readBuffer.readUtf8()); + } + + @Test public void sourceTimeout() throws Exception { + Pipe pipe = new Pipe(3L); + pipe.source.timeout().timeout(250, TimeUnit.MILLISECONDS); + double start = now(); + Buffer readBuffer = new Buffer(); + try { + pipe.source.read(readBuffer, 6L); + fail(); + } catch (InterruptedIOException expected) { + assertEquals("timeout", expected.getMessage()); + } + assertElapsed(250.0, start); + assertEquals(0, readBuffer.size()); + } + + /** + * The writer is writing 12 bytes as fast as it can to a 3 byte buffer. The reader alternates + * sleeping 250 ms, then reading 3 bytes. That should make for an approximate timeline like this: + * + * 0: writer writes 'abc', blocks 0: reader sleeps until 250 250: reader reads 'abc', sleeps until + * 500 250: writer writes 'def', blocks 500: reader reads 'def', sleeps until 750 500: writer + * writes 'ghi', blocks 750: reader reads 'ghi', sleeps until 1000 750: writer writes 'jkl', + * returns 1000: reader reads 'jkl', returns + * + * Because the writer is writing to a buffer, it finishes before the reader does. + */ + @Test public void sinkBlocksOnSlowReader() throws Exception { + final Pipe pipe = new Pipe(3L); + executorService.execute(new Runnable() { + @Override public void run() { + try { + Buffer buffer = new Buffer(); + Thread.sleep(250L); + assertEquals(3, pipe.source.read(buffer, Long.MAX_VALUE)); + assertEquals("abc", buffer.readUtf8()); + Thread.sleep(250L); + assertEquals(3, pipe.source.read(buffer, Long.MAX_VALUE)); + assertEquals("def", buffer.readUtf8()); + Thread.sleep(250L); + assertEquals(3, pipe.source.read(buffer, Long.MAX_VALUE)); + assertEquals("ghi", buffer.readUtf8()); + Thread.sleep(250L); + assertEquals(3, pipe.source.read(buffer, Long.MAX_VALUE)); + assertEquals("jkl", buffer.readUtf8()); + } catch (IOException | InterruptedException e) { + throw new AssertionError(); + } + } + }); + + double start = now(); + pipe.sink.write(new Buffer().writeUtf8("abcdefghijkl"), 12); + assertElapsed(750.0, start); + } + + @Test public void sinkWriteFailsByClosedReader() throws Exception { + final Pipe pipe = new Pipe(3L); + executorService.schedule(new Runnable() { + @Override public void run() { + try { + pipe.source.close(); + } catch (IOException e) { + throw new AssertionError(); + } + } + }, 250, TimeUnit.MILLISECONDS); + + double start = now(); + try { + pipe.sink.write(new Buffer().writeUtf8("abcdef"), 6); + fail(); + } catch (IOException expected) { + assertEquals("source is closed", expected.getMessage()); + assertElapsed(250.0, start); + } + } + + @Test public void sinkFlushWaitsForReaderToReadEverything() throws Exception { + final Buffer readBuffer = new Buffer(); + final Pipe pipe = new Pipe(100L); + executorService.execute(new Runnable() { + @Override public void run() { + try { + Thread.sleep(250); + pipe.source.read(readBuffer, 3); + Thread.sleep(250); + pipe.source.read(readBuffer, 3); + } catch (InterruptedException | IOException e) { + throw new AssertionError(e); + } + } + }); + + double start = now(); + pipe.sink.write(new Buffer().writeUtf8("abcdef"), 6); + pipe.sink.flush(); + assertElapsed(500.0, start); + assertEquals("abcdef", readBuffer.readUtf8()); + } + + @Test public void sinkFlushFailsIfReaderIsClosedBeforeAllDataIsRead() throws Exception { + final Pipe pipe = new Pipe(100L); + executorService.execute(new Runnable() { + @Override public void run() { + try { + Thread.sleep(250); + pipe.source.read(new Buffer(), 3); + Thread.sleep(250); + pipe.source.close(); + } catch (InterruptedException | IOException e) { + throw new AssertionError(e); + } + } + }); + + double start = now(); + pipe.sink.write(new Buffer().writeUtf8("abcdef"), 6); + try { + pipe.sink.flush(); + fail(); + } catch (IOException expected) { + assertEquals("source is closed", expected.getMessage()); + assertElapsed(500.0, start); + } + } + + @Test public void sinkCloseFailsIfReaderIsClosedBeforeAllDataIsRead() throws Exception { + final Pipe pipe = new Pipe(100L); + executorService.execute(new Runnable() { + @Override public void run() { + try { + Thread.sleep(250); + pipe.source.read(new Buffer(), 3); + Thread.sleep(250); + pipe.source.close(); + } catch (InterruptedException | IOException e) { + throw new AssertionError(e); + } + } + }); + + double start = now(); + pipe.sink.write(new Buffer().writeUtf8("abcdef"), 6); + try { + pipe.sink.close(); + fail(); + } catch (IOException expected) { + assertEquals("source is closed", expected.getMessage()); + assertElapsed(500.0, start); + } + + try { + pipe.sink.flush(); + fail(); + } catch (IllegalStateException expected) { + assertEquals("closed", expected.getMessage()); + } + } + + @Test public void sinkClose() throws Exception { + Pipe pipe = new Pipe(100L); + pipe.sink.close(); + try { + pipe.sink.write(new Buffer().writeUtf8("abc"), 3); + fail(); + } catch (IllegalStateException expected) { + assertEquals("closed", expected.getMessage()); + } + try { + pipe.sink.flush(); + fail(); + } catch (IllegalStateException expected) { + assertEquals("closed", expected.getMessage()); + } + } + + @Test public void sinkMultipleClose() throws Exception { + Pipe pipe = new Pipe(100L); + pipe.sink.close(); + pipe.sink.close(); + } + + @Test public void sourceClose() throws Exception { + Pipe pipe = new Pipe(100L); + pipe.source.close(); + try { + pipe.source.read(new Buffer(), 3); + fail(); + } catch (IllegalStateException expected) { + assertEquals("closed", expected.getMessage()); + } + } + + @Test public void sourceMultipleClose() throws Exception { + Pipe pipe = new Pipe(100L); + pipe.source.close(); + pipe.source.close(); + } + + @Test public void sourceReadUnblockedByClosedSink() throws Exception { + final Pipe pipe = new Pipe(3L); + executorService.schedule(new Runnable() { + @Override public void run() { + try { + pipe.sink.close(); + } catch (IOException e) { + throw new AssertionError(); + } + } + }, 250, TimeUnit.MILLISECONDS); + + double start = now(); + Buffer readBuffer = new Buffer(); + assertEquals(-1, pipe.source.read(readBuffer, Long.MAX_VALUE)); + assertEquals(0, readBuffer.size()); + assertElapsed(250.0, start); + } + + /** + * The writer has 12 bytes to write. It alternates sleeping 250 ms, then writing 3 bytes. The + * reader is reading as fast as it can. That should make for an approximate timeline like this: + * + * 0: writer sleeps until 250 0: reader blocks 250: writer writes 'abc', sleeps until 500 250: + * reader reads 'abc' 500: writer writes 'def', sleeps until 750 500: reader reads 'def' 750: + * writer writes 'ghi', sleeps until 1000 750: reader reads 'ghi' 1000: writer writes 'jkl', + * returns 1000: reader reads 'jkl', returns + */ + @Test public void sourceBlocksOnSlowWriter() throws Exception { + final Pipe pipe = new Pipe(100L); + executorService.execute(new Runnable() { + @Override public void run() { + try { + Thread.sleep(250L); + pipe.sink.write(new Buffer().writeUtf8("abc"), 3); + Thread.sleep(250L); + pipe.sink.write(new Buffer().writeUtf8("def"), 3); + Thread.sleep(250L); + pipe.sink.write(new Buffer().writeUtf8("ghi"), 3); + Thread.sleep(250L); + pipe.sink.write(new Buffer().writeUtf8("jkl"), 3); + } catch (IOException | InterruptedException e) { + throw new AssertionError(); + } + } + }); + + double start = now(); + Buffer readBuffer = new Buffer(); + + assertEquals(3, pipe.source.read(readBuffer, Long.MAX_VALUE)); + assertEquals("abc", readBuffer.readUtf8()); + assertElapsed(250.0, start); + + assertEquals(3, pipe.source.read(readBuffer, Long.MAX_VALUE)); + assertEquals("def", readBuffer.readUtf8()); + assertElapsed(500.0, start); + + assertEquals(3, pipe.source.read(readBuffer, Long.MAX_VALUE)); + assertEquals("ghi", readBuffer.readUtf8()); + assertElapsed(750.0, start); + + assertEquals(3, pipe.source.read(readBuffer, Long.MAX_VALUE)); + assertEquals("jkl", readBuffer.readUtf8()); + assertElapsed(1000.0, start); + } + + /** Returns the nanotime in milliseconds as a double for measuring timeouts. */ + private double now() { + return System.nanoTime() / 1000000.0d; + } + + /** + * Fails the test unless the time from start until now is duration, accepting differences in + * -50..+150 milliseconds. + */ + private void assertElapsed(double duration, double start) { + assertEquals(duration, now() - start + 50d, 100.0); + } +} diff --git a/okhttp-tests/src/test/java/okhttp3/internal/io/WaitUntilNotifiedTest.java b/okhttp-tests/src/test/java/okhttp3/internal/io/WaitUntilNotifiedTest.java new file mode 100644 index 000000000..443b5a24e --- /dev/null +++ b/okhttp-tests/src/test/java/okhttp3/internal/io/WaitUntilNotifiedTest.java @@ -0,0 +1,147 @@ +/* + * Copyright (C) 2016 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3.internal.io; + +import java.io.InterruptedIOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import okio.Timeout; +import org.junit.After; +import org.junit.Test; + +import static junit.framework.TestCase.fail; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public final class WaitUntilNotifiedTest { + final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(0); + + @After public void tearDown() throws Exception { + executorService.shutdown(); + } + + @Test public synchronized void notified() throws Exception { + Timeout timeout = new Timeout(); + timeout.timeout(5000, TimeUnit.MILLISECONDS); + + double start = now(); + executorService.schedule(new Runnable() { + @Override public void run() { + synchronized (WaitUntilNotifiedTest.this) { + WaitUntilNotifiedTest.this.notify(); + } + } + }, 250, TimeUnit.MILLISECONDS); + + Pipe.waitUntilNotified(this, timeout); + assertElapsed(250.0, start); + } + + @Test public synchronized void timeout() throws Exception { + Timeout timeout = new Timeout(); + timeout.timeout(250, TimeUnit.MILLISECONDS); + double start = now(); + try { + Pipe.waitUntilNotified(this, timeout); + fail(); + } catch (InterruptedIOException expected) { + assertEquals("timeout", expected.getMessage()); + } + assertElapsed(250.0, start); + } + + @Test public synchronized void deadline() throws Exception { + Timeout timeout = new Timeout(); + timeout.deadline(250, TimeUnit.MILLISECONDS); + double start = now(); + try { + Pipe.waitUntilNotified(this, timeout); + fail(); + } catch (InterruptedIOException expected) { + assertEquals("timeout", expected.getMessage()); + } + assertElapsed(250.0, start); + } + + @Test public synchronized void deadlineBeforeTimeout() throws Exception { + Timeout timeout = new Timeout(); + timeout.timeout(5000, TimeUnit.MILLISECONDS); + timeout.deadline(250, TimeUnit.MILLISECONDS); + double start = now(); + try { + Pipe.waitUntilNotified(this, timeout); + fail(); + } catch (InterruptedIOException expected) { + assertEquals("timeout", expected.getMessage()); + } + assertElapsed(250.0, start); + } + + @Test public synchronized void timeoutBeforeDeadline() throws Exception { + Timeout timeout = new Timeout(); + timeout.timeout(250, TimeUnit.MILLISECONDS); + timeout.deadline(5000, TimeUnit.MILLISECONDS); + double start = now(); + try { + Pipe.waitUntilNotified(this, timeout); + fail(); + } catch (InterruptedIOException expected) { + assertEquals("timeout", expected.getMessage()); + } + assertElapsed(250.0, start); + } + + @Test public synchronized void deadlineAlreadyReached() throws Exception { + Timeout timeout = new Timeout(); + timeout.deadlineNanoTime(System.nanoTime()); + double start = now(); + try { + Pipe.waitUntilNotified(this, timeout); + fail(); + } catch (InterruptedIOException expected) { + assertEquals("timeout", expected.getMessage()); + } + assertElapsed(0.0, start); + } + + @Test public synchronized void threadInterrupted() throws Exception { + Timeout timeout = new Timeout(); + double start = now(); + Thread.currentThread().interrupt(); + try { + Pipe.waitUntilNotified(this, timeout); + fail(); + } catch (InterruptedIOException expected) { + assertEquals("interrupted", expected.getMessage()); + assertFalse(Thread.interrupted()); + } + assertElapsed(0.0, start); + } + + /** Returns the nanotime in milliseconds as a double for measuring timeouts. */ + private double now() { + return System.nanoTime() / 1000000.0d; + } + + /** + * Fails the test unless the time from start until now is duration, accepting differences in + * -50..+150 milliseconds. + */ + private void assertElapsed(double duration, double start) { + assertEquals(duration, now() - start + 50d, 100.0); + } +} diff --git a/okhttp/src/main/java/okhttp3/internal/io/Pipe.java b/okhttp/src/main/java/okhttp3/internal/io/Pipe.java new file mode 100644 index 000000000..f1c30578c --- /dev/null +++ b/okhttp/src/main/java/okhttp3/internal/io/Pipe.java @@ -0,0 +1,182 @@ +/* + * Copyright (C) 2016 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3.internal.io; + +import java.io.IOException; +import java.io.InterruptedIOException; +import okio.Buffer; +import okio.Sink; +import okio.Source; +import okio.Timeout; + +/** + * A source and a sink that are attached. The sink's output is the source's input. Typically each + * is accessed by its own thread: a producer thread writes data to the sink and a consumer thread + * reads data from the source. + * + *

This class uses a buffer to decouple source and sink. This buffer has a user-specified maximum + * size. When a producer thread outruns its consumer the buffer fills up and eventually writes to + * the sink will block until the consumer has caught up. Symmetrically, if a consumer outruns its + * producer reads block until there is data to be read. Limits on the amount of time spent waiting + * for the other party can be configured with {@linkplain Timeout timeouts} on the source and the + * sink. + * + *

When the sink is closed, source reads will continue to complete normally until the buffer has + * been exhausted. At that point reads will return -1, indicating the end of the stream. But if the + * source is closed first, writes to the sink will immediately fail with an {@link IOException}. + */ +public final class Pipe { + final long maxBufferSize; + final Buffer buffer = new Buffer(); + boolean sinkClosed; + boolean sourceClosed; + public final Sink sink = new PipeSink(); + public final Source source = new PipeSource(); + + public Pipe(long maxBufferSize) { + if (maxBufferSize < 1L) { + throw new IllegalArgumentException("maxBufferSize < 1: " + maxBufferSize); + } + this.maxBufferSize = maxBufferSize; + } + + final class PipeSink implements Sink { + final Timeout timeout = new Timeout(); + + @Override public void write(Buffer source, long byteCount) throws IOException { + synchronized (buffer) { + if (sinkClosed) throw new IllegalStateException("closed"); + + while (byteCount > 0) { + if (sourceClosed) throw new IOException("source is closed"); + + long bufferSpaceAvailable = maxBufferSize - buffer.size(); + if (bufferSpaceAvailable == 0) { + waitUntilNotified(buffer, timeout); // Wait until the source drains the buffer. + continue; + } + + long bytesToWrite = Math.min(bufferSpaceAvailable, byteCount); + buffer.write(source, bytesToWrite); + byteCount -= bytesToWrite; + buffer.notifyAll(); // Notify the source that it can resume reading. + } + } + } + + @Override public void flush() throws IOException { + synchronized (buffer) { + if (sinkClosed) throw new IllegalStateException("closed"); + + while (buffer.size() > 0) { + if (sourceClosed) throw new IOException("source is closed"); + waitUntilNotified(buffer, timeout); + } + } + } + + @Override public void close() throws IOException { + synchronized (buffer) { + if (sinkClosed) return; + try { + flush(); + } finally { + sinkClosed = true; + buffer.notifyAll(); // Notify the source that no more bytes are coming. + } + } + } + + @Override public Timeout timeout() { + return timeout; + } + } + + final class PipeSource implements Source { + final Timeout timeout = new Timeout(); + + @Override public long read(Buffer sink, long byteCount) throws IOException { + synchronized (buffer) { + if (sourceClosed) throw new IllegalStateException("closed"); + + while (buffer.size() == 0) { + if (sinkClosed) return -1L; + waitUntilNotified(buffer, timeout); // Wait until the sink fills the buffer. + } + + long result = buffer.read(sink, byteCount); + buffer.notifyAll(); // Notify the sink that it can resume writing. + return result; + } + } + + @Override public void close() throws IOException { + synchronized (buffer) { + sourceClosed = true; + buffer.notifyAll(); // Notify the sink that no more bytes are desired. + } + } + + @Override public Timeout timeout() { + return timeout; + } + } + + /** + * Waits on {@code monitor} until it is notified. Throws {@link InterruptedIOException} if either + * the thread is interrupted or if {@code timeout} elapses before {@code monitor} is notified. The + * caller must be synchronized on {@code monitor}. + */ + static void waitUntilNotified(Object monitor, Timeout timeout) throws IOException { + try { + boolean hasDeadline = timeout.hasDeadline(); + long timeoutNanos = timeout.timeoutNanos(); + + if (!hasDeadline && timeoutNanos == 0L) { + monitor.wait(); // There is no timeout: wait forever. + return; + } + + // Compute how long we'll wait. + long waitNanos; + long start = System.nanoTime(); + if (hasDeadline && timeoutNanos != 0) { + long deadlineNanos = timeout.deadlineNanoTime() - start; + waitNanos = Math.min(timeoutNanos, deadlineNanos); + } else if (hasDeadline) { + waitNanos = timeout.deadlineNanoTime() - start; + } else { + waitNanos = timeoutNanos; + } + + // Attempt to wait that long. This will break out early if the monitor is notified. + long elapsedNanos = 0L; + if (waitNanos > 0L) { + long waitMillis = waitNanos / 1000000L; + monitor.wait(waitMillis, (int) (waitNanos - waitMillis * 1000000L)); + elapsedNanos = System.nanoTime() - start; + } + + // Throw if the timeout elapsed before the monitor was notified. + if (elapsedNanos >= waitNanos) { + throw new InterruptedIOException("timeout"); + } + } catch (InterruptedException e) { + throw new InterruptedIOException("interrupted"); + } + } +} +