1
0
mirror of https://github.com/square/okhttp.git synced 2026-01-17 08:42:25 +03:00

Merge pull request #2638 from square/jwilson.0618.pipe

A Pipe.
This commit is contained in:
Jesse Wilson
2016-06-19 00:33:01 -04:00
committed by GitHub
3 changed files with 732 additions and 0 deletions

View File

@@ -0,0 +1,403 @@
/*
* Copyright (C) 2016 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package okhttp3.internal.io;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import okio.Buffer;
import okio.ByteString;
import okio.HashingSink;
import okio.Source;
import org.junit.After;
import org.junit.Test;
import static junit.framework.TestCase.fail;
import static org.junit.Assert.assertEquals;
public final class PipeTest {
final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
@After public void tearDown() throws Exception {
executorService.shutdown();
}
@Test public void test() throws Exception {
Pipe pipe = new Pipe(6);
pipe.sink.write(new Buffer().writeUtf8("abc"), 3L);
Source source = pipe.source;
Buffer readBuffer = new Buffer();
assertEquals(3L, source.read(readBuffer, 6L));
assertEquals("abc", readBuffer.readUtf8());
pipe.sink.close();
assertEquals(-1L, source.read(readBuffer, 6L));
source.close();
}
/**
* A producer writes the first 16 MiB of bytes generated by {@code new Random(0)} to a sink, and a
* consumer consumes them. Both compute hashes of their data to confirm that they're as expected.
*/
@Test public void largeDataset() throws Exception {
final Pipe pipe = new Pipe(1000L); // An awkward size to force producer/consumer exchange.
final long totalBytes = 16L * 1024L * 1024L;
ByteString expectedHash = ByteString.decodeHex("7c3b224bea749086babe079360cf29f98d88262d");
// Write data to the sink.
Future<ByteString> sinkHash = executorService.submit(new Callable<ByteString>() {
@Override public ByteString call() throws Exception {
HashingSink hashingSink = HashingSink.sha1(pipe.sink);
Random random = new Random(0);
byte[] data = new byte[8192];
Buffer buffer = new Buffer();
for (long i = 0L; i < totalBytes; i += data.length) {
random.nextBytes(data);
buffer.write(data);
hashingSink.write(buffer, buffer.size());
}
hashingSink.close();
return hashingSink.hash();
}
});
// Read data from the source.
Future<ByteString> sourceHash = executorService.submit(new Callable<ByteString>() {
@Override public ByteString call() throws Exception {
Buffer blackhole = new Buffer();
HashingSink hashingSink = HashingSink.sha1(blackhole);
Buffer buffer = new Buffer();
while (pipe.source.read(buffer, Long.MAX_VALUE) != -1) {
hashingSink.write(buffer, buffer.size());
blackhole.clear();
}
pipe.source.close();
return hashingSink.hash();
}
});
assertEquals(expectedHash, sinkHash.get());
assertEquals(expectedHash, sourceHash.get());
}
@Test public void sinkTimeout() throws Exception {
Pipe pipe = new Pipe(3);
pipe.sink.timeout().timeout(250, TimeUnit.MILLISECONDS);
pipe.sink.write(new Buffer().writeUtf8("abc"), 3L);
double start = now();
try {
pipe.sink.write(new Buffer().writeUtf8("def"), 3L);
fail();
} catch (InterruptedIOException expected) {
assertEquals("timeout", expected.getMessage());
}
assertElapsed(250.0, start);
Buffer readBuffer = new Buffer();
assertEquals(3L, pipe.source.read(readBuffer, 6L));
assertEquals("abc", readBuffer.readUtf8());
}
@Test public void sourceTimeout() throws Exception {
Pipe pipe = new Pipe(3L);
pipe.source.timeout().timeout(250, TimeUnit.MILLISECONDS);
double start = now();
Buffer readBuffer = new Buffer();
try {
pipe.source.read(readBuffer, 6L);
fail();
} catch (InterruptedIOException expected) {
assertEquals("timeout", expected.getMessage());
}
assertElapsed(250.0, start);
assertEquals(0, readBuffer.size());
}
/**
* The writer is writing 12 bytes as fast as it can to a 3 byte buffer. The reader alternates
* sleeping 250 ms, then reading 3 bytes. That should make for an approximate timeline like this:
*
* 0: writer writes 'abc', blocks 0: reader sleeps until 250 250: reader reads 'abc', sleeps until
* 500 250: writer writes 'def', blocks 500: reader reads 'def', sleeps until 750 500: writer
* writes 'ghi', blocks 750: reader reads 'ghi', sleeps until 1000 750: writer writes 'jkl',
* returns 1000: reader reads 'jkl', returns
*
* Because the writer is writing to a buffer, it finishes before the reader does.
*/
@Test public void sinkBlocksOnSlowReader() throws Exception {
final Pipe pipe = new Pipe(3L);
executorService.execute(new Runnable() {
@Override public void run() {
try {
Buffer buffer = new Buffer();
Thread.sleep(250L);
assertEquals(3, pipe.source.read(buffer, Long.MAX_VALUE));
assertEquals("abc", buffer.readUtf8());
Thread.sleep(250L);
assertEquals(3, pipe.source.read(buffer, Long.MAX_VALUE));
assertEquals("def", buffer.readUtf8());
Thread.sleep(250L);
assertEquals(3, pipe.source.read(buffer, Long.MAX_VALUE));
assertEquals("ghi", buffer.readUtf8());
Thread.sleep(250L);
assertEquals(3, pipe.source.read(buffer, Long.MAX_VALUE));
assertEquals("jkl", buffer.readUtf8());
} catch (IOException | InterruptedException e) {
throw new AssertionError();
}
}
});
double start = now();
pipe.sink.write(new Buffer().writeUtf8("abcdefghijkl"), 12);
assertElapsed(750.0, start);
}
@Test public void sinkWriteFailsByClosedReader() throws Exception {
final Pipe pipe = new Pipe(3L);
executorService.schedule(new Runnable() {
@Override public void run() {
try {
pipe.source.close();
} catch (IOException e) {
throw new AssertionError();
}
}
}, 250, TimeUnit.MILLISECONDS);
double start = now();
try {
pipe.sink.write(new Buffer().writeUtf8("abcdef"), 6);
fail();
} catch (IOException expected) {
assertEquals("source is closed", expected.getMessage());
assertElapsed(250.0, start);
}
}
@Test public void sinkFlushWaitsForReaderToReadEverything() throws Exception {
final Buffer readBuffer = new Buffer();
final Pipe pipe = new Pipe(100L);
executorService.execute(new Runnable() {
@Override public void run() {
try {
Thread.sleep(250);
pipe.source.read(readBuffer, 3);
Thread.sleep(250);
pipe.source.read(readBuffer, 3);
} catch (InterruptedException | IOException e) {
throw new AssertionError(e);
}
}
});
double start = now();
pipe.sink.write(new Buffer().writeUtf8("abcdef"), 6);
pipe.sink.flush();
assertElapsed(500.0, start);
assertEquals("abcdef", readBuffer.readUtf8());
}
@Test public void sinkFlushFailsIfReaderIsClosedBeforeAllDataIsRead() throws Exception {
final Pipe pipe = new Pipe(100L);
executorService.execute(new Runnable() {
@Override public void run() {
try {
Thread.sleep(250);
pipe.source.read(new Buffer(), 3);
Thread.sleep(250);
pipe.source.close();
} catch (InterruptedException | IOException e) {
throw new AssertionError(e);
}
}
});
double start = now();
pipe.sink.write(new Buffer().writeUtf8("abcdef"), 6);
try {
pipe.sink.flush();
fail();
} catch (IOException expected) {
assertEquals("source is closed", expected.getMessage());
assertElapsed(500.0, start);
}
}
@Test public void sinkCloseFailsIfReaderIsClosedBeforeAllDataIsRead() throws Exception {
final Pipe pipe = new Pipe(100L);
executorService.execute(new Runnable() {
@Override public void run() {
try {
Thread.sleep(250);
pipe.source.read(new Buffer(), 3);
Thread.sleep(250);
pipe.source.close();
} catch (InterruptedException | IOException e) {
throw new AssertionError(e);
}
}
});
double start = now();
pipe.sink.write(new Buffer().writeUtf8("abcdef"), 6);
try {
pipe.sink.close();
fail();
} catch (IOException expected) {
assertEquals("source is closed", expected.getMessage());
assertElapsed(500.0, start);
}
try {
pipe.sink.flush();
fail();
} catch (IllegalStateException expected) {
assertEquals("closed", expected.getMessage());
}
}
@Test public void sinkClose() throws Exception {
Pipe pipe = new Pipe(100L);
pipe.sink.close();
try {
pipe.sink.write(new Buffer().writeUtf8("abc"), 3);
fail();
} catch (IllegalStateException expected) {
assertEquals("closed", expected.getMessage());
}
try {
pipe.sink.flush();
fail();
} catch (IllegalStateException expected) {
assertEquals("closed", expected.getMessage());
}
}
@Test public void sinkMultipleClose() throws Exception {
Pipe pipe = new Pipe(100L);
pipe.sink.close();
pipe.sink.close();
}
@Test public void sourceClose() throws Exception {
Pipe pipe = new Pipe(100L);
pipe.source.close();
try {
pipe.source.read(new Buffer(), 3);
fail();
} catch (IllegalStateException expected) {
assertEquals("closed", expected.getMessage());
}
}
@Test public void sourceMultipleClose() throws Exception {
Pipe pipe = new Pipe(100L);
pipe.source.close();
pipe.source.close();
}
@Test public void sourceReadUnblockedByClosedSink() throws Exception {
final Pipe pipe = new Pipe(3L);
executorService.schedule(new Runnable() {
@Override public void run() {
try {
pipe.sink.close();
} catch (IOException e) {
throw new AssertionError();
}
}
}, 250, TimeUnit.MILLISECONDS);
double start = now();
Buffer readBuffer = new Buffer();
assertEquals(-1, pipe.source.read(readBuffer, Long.MAX_VALUE));
assertEquals(0, readBuffer.size());
assertElapsed(250.0, start);
}
/**
* The writer has 12 bytes to write. It alternates sleeping 250 ms, then writing 3 bytes. The
* reader is reading as fast as it can. That should make for an approximate timeline like this:
*
* 0: writer sleeps until 250 0: reader blocks 250: writer writes 'abc', sleeps until 500 250:
* reader reads 'abc' 500: writer writes 'def', sleeps until 750 500: reader reads 'def' 750:
* writer writes 'ghi', sleeps until 1000 750: reader reads 'ghi' 1000: writer writes 'jkl',
* returns 1000: reader reads 'jkl', returns
*/
@Test public void sourceBlocksOnSlowWriter() throws Exception {
final Pipe pipe = new Pipe(100L);
executorService.execute(new Runnable() {
@Override public void run() {
try {
Thread.sleep(250L);
pipe.sink.write(new Buffer().writeUtf8("abc"), 3);
Thread.sleep(250L);
pipe.sink.write(new Buffer().writeUtf8("def"), 3);
Thread.sleep(250L);
pipe.sink.write(new Buffer().writeUtf8("ghi"), 3);
Thread.sleep(250L);
pipe.sink.write(new Buffer().writeUtf8("jkl"), 3);
} catch (IOException | InterruptedException e) {
throw new AssertionError();
}
}
});
double start = now();
Buffer readBuffer = new Buffer();
assertEquals(3, pipe.source.read(readBuffer, Long.MAX_VALUE));
assertEquals("abc", readBuffer.readUtf8());
assertElapsed(250.0, start);
assertEquals(3, pipe.source.read(readBuffer, Long.MAX_VALUE));
assertEquals("def", readBuffer.readUtf8());
assertElapsed(500.0, start);
assertEquals(3, pipe.source.read(readBuffer, Long.MAX_VALUE));
assertEquals("ghi", readBuffer.readUtf8());
assertElapsed(750.0, start);
assertEquals(3, pipe.source.read(readBuffer, Long.MAX_VALUE));
assertEquals("jkl", readBuffer.readUtf8());
assertElapsed(1000.0, start);
}
/** Returns the nanotime in milliseconds as a double for measuring timeouts. */
private double now() {
return System.nanoTime() / 1000000.0d;
}
/**
* Fails the test unless the time from start until now is duration, accepting differences in
* -50..+150 milliseconds.
*/
private void assertElapsed(double duration, double start) {
assertEquals(duration, now() - start + 50d, 100.0);
}
}

View File

@@ -0,0 +1,147 @@
/*
* Copyright (C) 2016 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package okhttp3.internal.io;
import java.io.InterruptedIOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import okio.Timeout;
import org.junit.After;
import org.junit.Test;
import static junit.framework.TestCase.fail;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
public final class WaitUntilNotifiedTest {
final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(0);
@After public void tearDown() throws Exception {
executorService.shutdown();
}
@Test public synchronized void notified() throws Exception {
Timeout timeout = new Timeout();
timeout.timeout(5000, TimeUnit.MILLISECONDS);
double start = now();
executorService.schedule(new Runnable() {
@Override public void run() {
synchronized (WaitUntilNotifiedTest.this) {
WaitUntilNotifiedTest.this.notify();
}
}
}, 250, TimeUnit.MILLISECONDS);
Pipe.waitUntilNotified(this, timeout);
assertElapsed(250.0, start);
}
@Test public synchronized void timeout() throws Exception {
Timeout timeout = new Timeout();
timeout.timeout(250, TimeUnit.MILLISECONDS);
double start = now();
try {
Pipe.waitUntilNotified(this, timeout);
fail();
} catch (InterruptedIOException expected) {
assertEquals("timeout", expected.getMessage());
}
assertElapsed(250.0, start);
}
@Test public synchronized void deadline() throws Exception {
Timeout timeout = new Timeout();
timeout.deadline(250, TimeUnit.MILLISECONDS);
double start = now();
try {
Pipe.waitUntilNotified(this, timeout);
fail();
} catch (InterruptedIOException expected) {
assertEquals("timeout", expected.getMessage());
}
assertElapsed(250.0, start);
}
@Test public synchronized void deadlineBeforeTimeout() throws Exception {
Timeout timeout = new Timeout();
timeout.timeout(5000, TimeUnit.MILLISECONDS);
timeout.deadline(250, TimeUnit.MILLISECONDS);
double start = now();
try {
Pipe.waitUntilNotified(this, timeout);
fail();
} catch (InterruptedIOException expected) {
assertEquals("timeout", expected.getMessage());
}
assertElapsed(250.0, start);
}
@Test public synchronized void timeoutBeforeDeadline() throws Exception {
Timeout timeout = new Timeout();
timeout.timeout(250, TimeUnit.MILLISECONDS);
timeout.deadline(5000, TimeUnit.MILLISECONDS);
double start = now();
try {
Pipe.waitUntilNotified(this, timeout);
fail();
} catch (InterruptedIOException expected) {
assertEquals("timeout", expected.getMessage());
}
assertElapsed(250.0, start);
}
@Test public synchronized void deadlineAlreadyReached() throws Exception {
Timeout timeout = new Timeout();
timeout.deadlineNanoTime(System.nanoTime());
double start = now();
try {
Pipe.waitUntilNotified(this, timeout);
fail();
} catch (InterruptedIOException expected) {
assertEquals("timeout", expected.getMessage());
}
assertElapsed(0.0, start);
}
@Test public synchronized void threadInterrupted() throws Exception {
Timeout timeout = new Timeout();
double start = now();
Thread.currentThread().interrupt();
try {
Pipe.waitUntilNotified(this, timeout);
fail();
} catch (InterruptedIOException expected) {
assertEquals("interrupted", expected.getMessage());
assertFalse(Thread.interrupted());
}
assertElapsed(0.0, start);
}
/** Returns the nanotime in milliseconds as a double for measuring timeouts. */
private double now() {
return System.nanoTime() / 1000000.0d;
}
/**
* Fails the test unless the time from start until now is duration, accepting differences in
* -50..+150 milliseconds.
*/
private void assertElapsed(double duration, double start) {
assertEquals(duration, now() - start + 50d, 100.0);
}
}

View File

@@ -0,0 +1,182 @@
/*
* Copyright (C) 2016 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package okhttp3.internal.io;
import java.io.IOException;
import java.io.InterruptedIOException;
import okio.Buffer;
import okio.Sink;
import okio.Source;
import okio.Timeout;
/**
* A source and a sink that are attached. The sink's output is the source's input. Typically each
* is accessed by its own thread: a producer thread writes data to the sink and a consumer thread
* reads data from the source.
*
* <p>This class uses a buffer to decouple source and sink. This buffer has a user-specified maximum
* size. When a producer thread outruns its consumer the buffer fills up and eventually writes to
* the sink will block until the consumer has caught up. Symmetrically, if a consumer outruns its
* producer reads block until there is data to be read. Limits on the amount of time spent waiting
* for the other party can be configured with {@linkplain Timeout timeouts} on the source and the
* sink.
*
* <p>When the sink is closed, source reads will continue to complete normally until the buffer has
* been exhausted. At that point reads will return -1, indicating the end of the stream. But if the
* source is closed first, writes to the sink will immediately fail with an {@link IOException}.
*/
public final class Pipe {
final long maxBufferSize;
final Buffer buffer = new Buffer();
boolean sinkClosed;
boolean sourceClosed;
public final Sink sink = new PipeSink();
public final Source source = new PipeSource();
public Pipe(long maxBufferSize) {
if (maxBufferSize < 1L) {
throw new IllegalArgumentException("maxBufferSize < 1: " + maxBufferSize);
}
this.maxBufferSize = maxBufferSize;
}
final class PipeSink implements Sink {
final Timeout timeout = new Timeout();
@Override public void write(Buffer source, long byteCount) throws IOException {
synchronized (buffer) {
if (sinkClosed) throw new IllegalStateException("closed");
while (byteCount > 0) {
if (sourceClosed) throw new IOException("source is closed");
long bufferSpaceAvailable = maxBufferSize - buffer.size();
if (bufferSpaceAvailable == 0) {
waitUntilNotified(buffer, timeout); // Wait until the source drains the buffer.
continue;
}
long bytesToWrite = Math.min(bufferSpaceAvailable, byteCount);
buffer.write(source, bytesToWrite);
byteCount -= bytesToWrite;
buffer.notifyAll(); // Notify the source that it can resume reading.
}
}
}
@Override public void flush() throws IOException {
synchronized (buffer) {
if (sinkClosed) throw new IllegalStateException("closed");
while (buffer.size() > 0) {
if (sourceClosed) throw new IOException("source is closed");
waitUntilNotified(buffer, timeout);
}
}
}
@Override public void close() throws IOException {
synchronized (buffer) {
if (sinkClosed) return;
try {
flush();
} finally {
sinkClosed = true;
buffer.notifyAll(); // Notify the source that no more bytes are coming.
}
}
}
@Override public Timeout timeout() {
return timeout;
}
}
final class PipeSource implements Source {
final Timeout timeout = new Timeout();
@Override public long read(Buffer sink, long byteCount) throws IOException {
synchronized (buffer) {
if (sourceClosed) throw new IllegalStateException("closed");
while (buffer.size() == 0) {
if (sinkClosed) return -1L;
waitUntilNotified(buffer, timeout); // Wait until the sink fills the buffer.
}
long result = buffer.read(sink, byteCount);
buffer.notifyAll(); // Notify the sink that it can resume writing.
return result;
}
}
@Override public void close() throws IOException {
synchronized (buffer) {
sourceClosed = true;
buffer.notifyAll(); // Notify the sink that no more bytes are desired.
}
}
@Override public Timeout timeout() {
return timeout;
}
}
/**
* Waits on {@code monitor} until it is notified. Throws {@link InterruptedIOException} if either
* the thread is interrupted or if {@code timeout} elapses before {@code monitor} is notified. The
* caller must be synchronized on {@code monitor}.
*/
static void waitUntilNotified(Object monitor, Timeout timeout) throws IOException {
try {
boolean hasDeadline = timeout.hasDeadline();
long timeoutNanos = timeout.timeoutNanos();
if (!hasDeadline && timeoutNanos == 0L) {
monitor.wait(); // There is no timeout: wait forever.
return;
}
// Compute how long we'll wait.
long waitNanos;
long start = System.nanoTime();
if (hasDeadline && timeoutNanos != 0) {
long deadlineNanos = timeout.deadlineNanoTime() - start;
waitNanos = Math.min(timeoutNanos, deadlineNanos);
} else if (hasDeadline) {
waitNanos = timeout.deadlineNanoTime() - start;
} else {
waitNanos = timeoutNanos;
}
// Attempt to wait that long. This will break out early if the monitor is notified.
long elapsedNanos = 0L;
if (waitNanos > 0L) {
long waitMillis = waitNanos / 1000000L;
monitor.wait(waitMillis, (int) (waitNanos - waitMillis * 1000000L));
elapsedNanos = System.nanoTime() - start;
}
// Throw if the timeout elapsed before the monitor was notified.
if (elapsedNanos >= waitNanos) {
throw new InterruptedIOException("timeout");
}
} catch (InterruptedException e) {
throw new InterruptedIOException("interrupted");
}
}
}