diff --git a/okhttp-tests/src/test/java/okhttp3/internal/io/PipeTest.java b/okhttp-tests/src/test/java/okhttp3/internal/io/PipeTest.java new file mode 100644 index 000000000..35bfbe40d --- /dev/null +++ b/okhttp-tests/src/test/java/okhttp3/internal/io/PipeTest.java @@ -0,0 +1,403 @@ +/* + * Copyright (C) 2016 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3.internal.io; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import okio.Buffer; +import okio.ByteString; +import okio.HashingSink; +import okio.Source; +import org.junit.After; +import org.junit.Test; + +import static junit.framework.TestCase.fail; +import static org.junit.Assert.assertEquals; + +public final class PipeTest { + final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2); + + @After public void tearDown() throws Exception { + executorService.shutdown(); + } + + @Test public void test() throws Exception { + Pipe pipe = new Pipe(6); + pipe.sink.write(new Buffer().writeUtf8("abc"), 3L); + + Source source = pipe.source; + Buffer readBuffer = new Buffer(); + assertEquals(3L, source.read(readBuffer, 6L)); + assertEquals("abc", readBuffer.readUtf8()); + + pipe.sink.close(); + assertEquals(-1L, source.read(readBuffer, 6L)); + + source.close(); + } + + /** + * A producer writes the first 16 MiB of bytes generated by {@code new Random(0)} to a sink, and a + * consumer consumes them. Both compute hashes of their data to confirm that they're as expected. + */ + @Test public void largeDataset() throws Exception { + final Pipe pipe = new Pipe(1000L); // An awkward size to force producer/consumer exchange. + final long totalBytes = 16L * 1024L * 1024L; + ByteString expectedHash = ByteString.decodeHex("7c3b224bea749086babe079360cf29f98d88262d"); + + // Write data to the sink. + Future sinkHash = executorService.submit(new Callable() { + @Override public ByteString call() throws Exception { + HashingSink hashingSink = HashingSink.sha1(pipe.sink); + Random random = new Random(0); + byte[] data = new byte[8192]; + + Buffer buffer = new Buffer(); + for (long i = 0L; i < totalBytes; i += data.length) { + random.nextBytes(data); + buffer.write(data); + hashingSink.write(buffer, buffer.size()); + } + + hashingSink.close(); + return hashingSink.hash(); + } + }); + + // Read data from the source. + Future sourceHash = executorService.submit(new Callable() { + @Override public ByteString call() throws Exception { + Buffer blackhole = new Buffer(); + HashingSink hashingSink = HashingSink.sha1(blackhole); + + Buffer buffer = new Buffer(); + while (pipe.source.read(buffer, Long.MAX_VALUE) != -1) { + hashingSink.write(buffer, buffer.size()); + blackhole.clear(); + } + + pipe.source.close(); + return hashingSink.hash(); + } + }); + + assertEquals(expectedHash, sinkHash.get()); + assertEquals(expectedHash, sourceHash.get()); + } + + @Test public void sinkTimeout() throws Exception { + Pipe pipe = new Pipe(3); + pipe.sink.timeout().timeout(250, TimeUnit.MILLISECONDS); + pipe.sink.write(new Buffer().writeUtf8("abc"), 3L); + double start = now(); + try { + pipe.sink.write(new Buffer().writeUtf8("def"), 3L); + fail(); + } catch (InterruptedIOException expected) { + assertEquals("timeout", expected.getMessage()); + } + assertElapsed(250.0, start); + + Buffer readBuffer = new Buffer(); + assertEquals(3L, pipe.source.read(readBuffer, 6L)); + assertEquals("abc", readBuffer.readUtf8()); + } + + @Test public void sourceTimeout() throws Exception { + Pipe pipe = new Pipe(3L); + pipe.source.timeout().timeout(250, TimeUnit.MILLISECONDS); + double start = now(); + Buffer readBuffer = new Buffer(); + try { + pipe.source.read(readBuffer, 6L); + fail(); + } catch (InterruptedIOException expected) { + assertEquals("timeout", expected.getMessage()); + } + assertElapsed(250.0, start); + assertEquals(0, readBuffer.size()); + } + + /** + * The writer is writing 12 bytes as fast as it can to a 3 byte buffer. The reader alternates + * sleeping 250 ms, then reading 3 bytes. That should make for an approximate timeline like this: + * + * 0: writer writes 'abc', blocks 0: reader sleeps until 250 250: reader reads 'abc', sleeps until + * 500 250: writer writes 'def', blocks 500: reader reads 'def', sleeps until 750 500: writer + * writes 'ghi', blocks 750: reader reads 'ghi', sleeps until 1000 750: writer writes 'jkl', + * returns 1000: reader reads 'jkl', returns + * + * Because the writer is writing to a buffer, it finishes before the reader does. + */ + @Test public void sinkBlocksOnSlowReader() throws Exception { + final Pipe pipe = new Pipe(3L); + executorService.execute(new Runnable() { + @Override public void run() { + try { + Buffer buffer = new Buffer(); + Thread.sleep(250L); + assertEquals(3, pipe.source.read(buffer, Long.MAX_VALUE)); + assertEquals("abc", buffer.readUtf8()); + Thread.sleep(250L); + assertEquals(3, pipe.source.read(buffer, Long.MAX_VALUE)); + assertEquals("def", buffer.readUtf8()); + Thread.sleep(250L); + assertEquals(3, pipe.source.read(buffer, Long.MAX_VALUE)); + assertEquals("ghi", buffer.readUtf8()); + Thread.sleep(250L); + assertEquals(3, pipe.source.read(buffer, Long.MAX_VALUE)); + assertEquals("jkl", buffer.readUtf8()); + } catch (IOException | InterruptedException e) { + throw new AssertionError(); + } + } + }); + + double start = now(); + pipe.sink.write(new Buffer().writeUtf8("abcdefghijkl"), 12); + assertElapsed(750.0, start); + } + + @Test public void sinkWriteFailsByClosedReader() throws Exception { + final Pipe pipe = new Pipe(3L); + executorService.schedule(new Runnable() { + @Override public void run() { + try { + pipe.source.close(); + } catch (IOException e) { + throw new AssertionError(); + } + } + }, 250, TimeUnit.MILLISECONDS); + + double start = now(); + try { + pipe.sink.write(new Buffer().writeUtf8("abcdef"), 6); + fail(); + } catch (IOException expected) { + assertEquals("source is closed", expected.getMessage()); + assertElapsed(250.0, start); + } + } + + @Test public void sinkFlushWaitsForReaderToReadEverything() throws Exception { + final Buffer readBuffer = new Buffer(); + final Pipe pipe = new Pipe(100L); + executorService.execute(new Runnable() { + @Override public void run() { + try { + Thread.sleep(250); + pipe.source.read(readBuffer, 3); + Thread.sleep(250); + pipe.source.read(readBuffer, 3); + } catch (InterruptedException | IOException e) { + throw new AssertionError(e); + } + } + }); + + double start = now(); + pipe.sink.write(new Buffer().writeUtf8("abcdef"), 6); + pipe.sink.flush(); + assertElapsed(500.0, start); + assertEquals("abcdef", readBuffer.readUtf8()); + } + + @Test public void sinkFlushFailsIfReaderIsClosedBeforeAllDataIsRead() throws Exception { + final Pipe pipe = new Pipe(100L); + executorService.execute(new Runnable() { + @Override public void run() { + try { + Thread.sleep(250); + pipe.source.read(new Buffer(), 3); + Thread.sleep(250); + pipe.source.close(); + } catch (InterruptedException | IOException e) { + throw new AssertionError(e); + } + } + }); + + double start = now(); + pipe.sink.write(new Buffer().writeUtf8("abcdef"), 6); + try { + pipe.sink.flush(); + fail(); + } catch (IOException expected) { + assertEquals("source is closed", expected.getMessage()); + assertElapsed(500.0, start); + } + } + + @Test public void sinkCloseFailsIfReaderIsClosedBeforeAllDataIsRead() throws Exception { + final Pipe pipe = new Pipe(100L); + executorService.execute(new Runnable() { + @Override public void run() { + try { + Thread.sleep(250); + pipe.source.read(new Buffer(), 3); + Thread.sleep(250); + pipe.source.close(); + } catch (InterruptedException | IOException e) { + throw new AssertionError(e); + } + } + }); + + double start = now(); + pipe.sink.write(new Buffer().writeUtf8("abcdef"), 6); + try { + pipe.sink.close(); + fail(); + } catch (IOException expected) { + assertEquals("source is closed", expected.getMessage()); + assertElapsed(500.0, start); + } + + try { + pipe.sink.flush(); + fail(); + } catch (IllegalStateException expected) { + assertEquals("closed", expected.getMessage()); + } + } + + @Test public void sinkClose() throws Exception { + Pipe pipe = new Pipe(100L); + pipe.sink.close(); + try { + pipe.sink.write(new Buffer().writeUtf8("abc"), 3); + fail(); + } catch (IllegalStateException expected) { + assertEquals("closed", expected.getMessage()); + } + try { + pipe.sink.flush(); + fail(); + } catch (IllegalStateException expected) { + assertEquals("closed", expected.getMessage()); + } + } + + @Test public void sinkMultipleClose() throws Exception { + Pipe pipe = new Pipe(100L); + pipe.sink.close(); + pipe.sink.close(); + } + + @Test public void sourceClose() throws Exception { + Pipe pipe = new Pipe(100L); + pipe.source.close(); + try { + pipe.source.read(new Buffer(), 3); + fail(); + } catch (IllegalStateException expected) { + assertEquals("closed", expected.getMessage()); + } + } + + @Test public void sourceMultipleClose() throws Exception { + Pipe pipe = new Pipe(100L); + pipe.source.close(); + pipe.source.close(); + } + + @Test public void sourceReadUnblockedByClosedSink() throws Exception { + final Pipe pipe = new Pipe(3L); + executorService.schedule(new Runnable() { + @Override public void run() { + try { + pipe.sink.close(); + } catch (IOException e) { + throw new AssertionError(); + } + } + }, 250, TimeUnit.MILLISECONDS); + + double start = now(); + Buffer readBuffer = new Buffer(); + assertEquals(-1, pipe.source.read(readBuffer, Long.MAX_VALUE)); + assertEquals(0, readBuffer.size()); + assertElapsed(250.0, start); + } + + /** + * The writer has 12 bytes to write. It alternates sleeping 250 ms, then writing 3 bytes. The + * reader is reading as fast as it can. That should make for an approximate timeline like this: + * + * 0: writer sleeps until 250 0: reader blocks 250: writer writes 'abc', sleeps until 500 250: + * reader reads 'abc' 500: writer writes 'def', sleeps until 750 500: reader reads 'def' 750: + * writer writes 'ghi', sleeps until 1000 750: reader reads 'ghi' 1000: writer writes 'jkl', + * returns 1000: reader reads 'jkl', returns + */ + @Test public void sourceBlocksOnSlowWriter() throws Exception { + final Pipe pipe = new Pipe(100L); + executorService.execute(new Runnable() { + @Override public void run() { + try { + Thread.sleep(250L); + pipe.sink.write(new Buffer().writeUtf8("abc"), 3); + Thread.sleep(250L); + pipe.sink.write(new Buffer().writeUtf8("def"), 3); + Thread.sleep(250L); + pipe.sink.write(new Buffer().writeUtf8("ghi"), 3); + Thread.sleep(250L); + pipe.sink.write(new Buffer().writeUtf8("jkl"), 3); + } catch (IOException | InterruptedException e) { + throw new AssertionError(); + } + } + }); + + double start = now(); + Buffer readBuffer = new Buffer(); + + assertEquals(3, pipe.source.read(readBuffer, Long.MAX_VALUE)); + assertEquals("abc", readBuffer.readUtf8()); + assertElapsed(250.0, start); + + assertEquals(3, pipe.source.read(readBuffer, Long.MAX_VALUE)); + assertEquals("def", readBuffer.readUtf8()); + assertElapsed(500.0, start); + + assertEquals(3, pipe.source.read(readBuffer, Long.MAX_VALUE)); + assertEquals("ghi", readBuffer.readUtf8()); + assertElapsed(750.0, start); + + assertEquals(3, pipe.source.read(readBuffer, Long.MAX_VALUE)); + assertEquals("jkl", readBuffer.readUtf8()); + assertElapsed(1000.0, start); + } + + /** Returns the nanotime in milliseconds as a double for measuring timeouts. */ + private double now() { + return System.nanoTime() / 1000000.0d; + } + + /** + * Fails the test unless the time from start until now is duration, accepting differences in + * -50..+150 milliseconds. + */ + private void assertElapsed(double duration, double start) { + assertEquals(duration, now() - start + 50d, 100.0); + } +} diff --git a/okhttp-tests/src/test/java/okhttp3/internal/io/WaitUntilNotifiedTest.java b/okhttp-tests/src/test/java/okhttp3/internal/io/WaitUntilNotifiedTest.java new file mode 100644 index 000000000..443b5a24e --- /dev/null +++ b/okhttp-tests/src/test/java/okhttp3/internal/io/WaitUntilNotifiedTest.java @@ -0,0 +1,147 @@ +/* + * Copyright (C) 2016 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3.internal.io; + +import java.io.InterruptedIOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import okio.Timeout; +import org.junit.After; +import org.junit.Test; + +import static junit.framework.TestCase.fail; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public final class WaitUntilNotifiedTest { + final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(0); + + @After public void tearDown() throws Exception { + executorService.shutdown(); + } + + @Test public synchronized void notified() throws Exception { + Timeout timeout = new Timeout(); + timeout.timeout(5000, TimeUnit.MILLISECONDS); + + double start = now(); + executorService.schedule(new Runnable() { + @Override public void run() { + synchronized (WaitUntilNotifiedTest.this) { + WaitUntilNotifiedTest.this.notify(); + } + } + }, 250, TimeUnit.MILLISECONDS); + + Pipe.waitUntilNotified(this, timeout); + assertElapsed(250.0, start); + } + + @Test public synchronized void timeout() throws Exception { + Timeout timeout = new Timeout(); + timeout.timeout(250, TimeUnit.MILLISECONDS); + double start = now(); + try { + Pipe.waitUntilNotified(this, timeout); + fail(); + } catch (InterruptedIOException expected) { + assertEquals("timeout", expected.getMessage()); + } + assertElapsed(250.0, start); + } + + @Test public synchronized void deadline() throws Exception { + Timeout timeout = new Timeout(); + timeout.deadline(250, TimeUnit.MILLISECONDS); + double start = now(); + try { + Pipe.waitUntilNotified(this, timeout); + fail(); + } catch (InterruptedIOException expected) { + assertEquals("timeout", expected.getMessage()); + } + assertElapsed(250.0, start); + } + + @Test public synchronized void deadlineBeforeTimeout() throws Exception { + Timeout timeout = new Timeout(); + timeout.timeout(5000, TimeUnit.MILLISECONDS); + timeout.deadline(250, TimeUnit.MILLISECONDS); + double start = now(); + try { + Pipe.waitUntilNotified(this, timeout); + fail(); + } catch (InterruptedIOException expected) { + assertEquals("timeout", expected.getMessage()); + } + assertElapsed(250.0, start); + } + + @Test public synchronized void timeoutBeforeDeadline() throws Exception { + Timeout timeout = new Timeout(); + timeout.timeout(250, TimeUnit.MILLISECONDS); + timeout.deadline(5000, TimeUnit.MILLISECONDS); + double start = now(); + try { + Pipe.waitUntilNotified(this, timeout); + fail(); + } catch (InterruptedIOException expected) { + assertEquals("timeout", expected.getMessage()); + } + assertElapsed(250.0, start); + } + + @Test public synchronized void deadlineAlreadyReached() throws Exception { + Timeout timeout = new Timeout(); + timeout.deadlineNanoTime(System.nanoTime()); + double start = now(); + try { + Pipe.waitUntilNotified(this, timeout); + fail(); + } catch (InterruptedIOException expected) { + assertEquals("timeout", expected.getMessage()); + } + assertElapsed(0.0, start); + } + + @Test public synchronized void threadInterrupted() throws Exception { + Timeout timeout = new Timeout(); + double start = now(); + Thread.currentThread().interrupt(); + try { + Pipe.waitUntilNotified(this, timeout); + fail(); + } catch (InterruptedIOException expected) { + assertEquals("interrupted", expected.getMessage()); + assertFalse(Thread.interrupted()); + } + assertElapsed(0.0, start); + } + + /** Returns the nanotime in milliseconds as a double for measuring timeouts. */ + private double now() { + return System.nanoTime() / 1000000.0d; + } + + /** + * Fails the test unless the time from start until now is duration, accepting differences in + * -50..+150 milliseconds. + */ + private void assertElapsed(double duration, double start) { + assertEquals(duration, now() - start + 50d, 100.0); + } +} diff --git a/okhttp/src/main/java/okhttp3/internal/io/Pipe.java b/okhttp/src/main/java/okhttp3/internal/io/Pipe.java new file mode 100644 index 000000000..f1c30578c --- /dev/null +++ b/okhttp/src/main/java/okhttp3/internal/io/Pipe.java @@ -0,0 +1,182 @@ +/* + * Copyright (C) 2016 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3.internal.io; + +import java.io.IOException; +import java.io.InterruptedIOException; +import okio.Buffer; +import okio.Sink; +import okio.Source; +import okio.Timeout; + +/** + * A source and a sink that are attached. The sink's output is the source's input. Typically each + * is accessed by its own thread: a producer thread writes data to the sink and a consumer thread + * reads data from the source. + * + *

This class uses a buffer to decouple source and sink. This buffer has a user-specified maximum + * size. When a producer thread outruns its consumer the buffer fills up and eventually writes to + * the sink will block until the consumer has caught up. Symmetrically, if a consumer outruns its + * producer reads block until there is data to be read. Limits on the amount of time spent waiting + * for the other party can be configured with {@linkplain Timeout timeouts} on the source and the + * sink. + * + *

When the sink is closed, source reads will continue to complete normally until the buffer has + * been exhausted. At that point reads will return -1, indicating the end of the stream. But if the + * source is closed first, writes to the sink will immediately fail with an {@link IOException}. + */ +public final class Pipe { + final long maxBufferSize; + final Buffer buffer = new Buffer(); + boolean sinkClosed; + boolean sourceClosed; + public final Sink sink = new PipeSink(); + public final Source source = new PipeSource(); + + public Pipe(long maxBufferSize) { + if (maxBufferSize < 1L) { + throw new IllegalArgumentException("maxBufferSize < 1: " + maxBufferSize); + } + this.maxBufferSize = maxBufferSize; + } + + final class PipeSink implements Sink { + final Timeout timeout = new Timeout(); + + @Override public void write(Buffer source, long byteCount) throws IOException { + synchronized (buffer) { + if (sinkClosed) throw new IllegalStateException("closed"); + + while (byteCount > 0) { + if (sourceClosed) throw new IOException("source is closed"); + + long bufferSpaceAvailable = maxBufferSize - buffer.size(); + if (bufferSpaceAvailable == 0) { + waitUntilNotified(buffer, timeout); // Wait until the source drains the buffer. + continue; + } + + long bytesToWrite = Math.min(bufferSpaceAvailable, byteCount); + buffer.write(source, bytesToWrite); + byteCount -= bytesToWrite; + buffer.notifyAll(); // Notify the source that it can resume reading. + } + } + } + + @Override public void flush() throws IOException { + synchronized (buffer) { + if (sinkClosed) throw new IllegalStateException("closed"); + + while (buffer.size() > 0) { + if (sourceClosed) throw new IOException("source is closed"); + waitUntilNotified(buffer, timeout); + } + } + } + + @Override public void close() throws IOException { + synchronized (buffer) { + if (sinkClosed) return; + try { + flush(); + } finally { + sinkClosed = true; + buffer.notifyAll(); // Notify the source that no more bytes are coming. + } + } + } + + @Override public Timeout timeout() { + return timeout; + } + } + + final class PipeSource implements Source { + final Timeout timeout = new Timeout(); + + @Override public long read(Buffer sink, long byteCount) throws IOException { + synchronized (buffer) { + if (sourceClosed) throw new IllegalStateException("closed"); + + while (buffer.size() == 0) { + if (sinkClosed) return -1L; + waitUntilNotified(buffer, timeout); // Wait until the sink fills the buffer. + } + + long result = buffer.read(sink, byteCount); + buffer.notifyAll(); // Notify the sink that it can resume writing. + return result; + } + } + + @Override public void close() throws IOException { + synchronized (buffer) { + sourceClosed = true; + buffer.notifyAll(); // Notify the sink that no more bytes are desired. + } + } + + @Override public Timeout timeout() { + return timeout; + } + } + + /** + * Waits on {@code monitor} until it is notified. Throws {@link InterruptedIOException} if either + * the thread is interrupted or if {@code timeout} elapses before {@code monitor} is notified. The + * caller must be synchronized on {@code monitor}. + */ + static void waitUntilNotified(Object monitor, Timeout timeout) throws IOException { + try { + boolean hasDeadline = timeout.hasDeadline(); + long timeoutNanos = timeout.timeoutNanos(); + + if (!hasDeadline && timeoutNanos == 0L) { + monitor.wait(); // There is no timeout: wait forever. + return; + } + + // Compute how long we'll wait. + long waitNanos; + long start = System.nanoTime(); + if (hasDeadline && timeoutNanos != 0) { + long deadlineNanos = timeout.deadlineNanoTime() - start; + waitNanos = Math.min(timeoutNanos, deadlineNanos); + } else if (hasDeadline) { + waitNanos = timeout.deadlineNanoTime() - start; + } else { + waitNanos = timeoutNanos; + } + + // Attempt to wait that long. This will break out early if the monitor is notified. + long elapsedNanos = 0L; + if (waitNanos > 0L) { + long waitMillis = waitNanos / 1000000L; + monitor.wait(waitMillis, (int) (waitNanos - waitMillis * 1000000L)); + elapsedNanos = System.nanoTime() - start; + } + + // Throw if the timeout elapsed before the monitor was notified. + if (elapsedNanos >= waitNanos) { + throw new InterruptedIOException("timeout"); + } + } catch (InterruptedException e) { + throw new InterruptedIOException("interrupted"); + } + } +} +