From 2d01579a18cc50c33147e3e51924515dbdbf950b Mon Sep 17 00:00:00 2001 From: jwilson Date: Thu, 30 Jan 2014 22:20:55 -0500 Subject: [PATCH] Adapters that go the other way, to java.io. These ones are slightly more awkward because they need to do their own internal buffering. --- .../com/squareup/okhttp/internal/Util.java | 2 +- .../okhttp/internal/bytes/OkBuffer.java | 18 +-- .../okhttp/internal/bytes/OkBuffers.java | 103 +++++++++++++++++- .../okhttp/internal/bytes/OkBufferTest.java | 80 +++++++++++++- 4 files changed, 186 insertions(+), 17 deletions(-) diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/Util.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/Util.java index 0cd85e191..fbbf46fd0 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/Util.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/Util.java @@ -82,7 +82,7 @@ public final class Util { return -1; } - public static void checkOffsetAndCount(int arrayLength, int offset, int count) { + public static void checkOffsetAndCount(long arrayLength, long offset, long count) { if ((offset | count) < 0 || offset > arrayLength || arrayLength - offset < count) { throw new ArrayIndexOutOfBoundsException(); } diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/OkBuffer.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/OkBuffer.java index fc6c25f7d..e534dc526 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/OkBuffer.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/OkBuffer.java @@ -21,6 +21,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import static com.squareup.okhttp.internal.Util.checkOffsetAndCount; + /** * A collection of bytes in memory. * @@ -147,7 +149,7 @@ public final class OkBuffer implements Source, Sink { } private byte[] readBytes(int byteCount) { - checkByteCount(byteCount); + checkOffsetAndCount(this.byteCount, 0, byteCount); int offset = 0; byte[] result = new byte[byteCount]; @@ -301,7 +303,7 @@ public final class OkBuffer implements Source, Sink { // yielding sink [51%, 91%, 30%] and source [62%, 82%]. if (source == this) throw new IllegalArgumentException("source == this"); - source.checkByteCount(byteCount); + checkOffsetAndCount(source.byteCount, 0, byteCount); while (byteCount > 0) { // Is a prefix of the source's head segment all that we need to move? @@ -365,7 +367,6 @@ public final class OkBuffer implements Source, Sink { } @Override public void flush(Deadline deadline) { - throw new UnsupportedOperationException("Cannot flush() an OkBuffer"); } @Override public void close(Deadline deadline) { @@ -400,15 +401,4 @@ public final class OkBuffer implements Source, Sink { } return new String(result); } - - /** Throws if this has fewer bytes than {@code requested}. */ - void checkByteCount(long requested) { - if (requested < 0) { - throw new IllegalArgumentException("requested < 0: " + requested); - } - if (requested > this.byteCount) { - throw new IllegalArgumentException( - String.format("requested %s > available %s", requested, this.byteCount)); - } - } } diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/OkBuffers.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/OkBuffers.java index ec7c60a3e..c4a17ac9d 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/OkBuffers.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/OkBuffers.java @@ -19,6 +19,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import static com.squareup.okhttp.internal.Util.checkOffsetAndCount; + public final class OkBuffers { private OkBuffers() { } @@ -28,7 +30,7 @@ public final class OkBuffers { return new Sink() { @Override public void write(OkBuffer source, long byteCount, Deadline deadline) throws IOException { - source.checkByteCount(byteCount); + checkOffsetAndCount(source.byteCount, 0, byteCount); while (byteCount > 0) { deadline.throwIfReached(); Segment head = source.head; @@ -60,6 +62,52 @@ public final class OkBuffers { }; } + /** + * Returns an output stream that writes to {@code sink}. This may buffer data + * by deferring writes. + */ + public static OutputStream outputStream(final Sink sink) { + return new OutputStream() { + final OkBuffer buffer = new OkBuffer(); // Buffer at most one segment of data. + + @Override public void write(int b) throws IOException { + buffer.writeByte((byte) b); + if (buffer.byteCount == Segment.SIZE) { + sink.write(buffer, buffer.byteCount, Deadline.NONE); + } + } + + @Override public void write(byte[] data, int offset, int byteCount) throws IOException { + checkOffsetAndCount(data.length, offset, byteCount); + int limit = offset + byteCount; + while (offset < limit) { + Segment onlySegment = buffer.writableSegment(1); + int toCopy = Math.min(limit - offset, Segment.SIZE - onlySegment.limit); + System.arraycopy(data, offset, onlySegment.data, onlySegment.limit, toCopy); + offset += toCopy; + onlySegment.limit += toCopy; + buffer.byteCount += toCopy; + if (buffer.byteCount == Segment.SIZE) { + sink.write(buffer, buffer.byteCount, Deadline.NONE); + } + } + } + + @Override public void flush() throws IOException { + sink.write(buffer, buffer.byteCount, Deadline.NONE); + sink.flush(Deadline.NONE); + } + + @Override public void close() throws IOException { + sink.close(Deadline.NONE); + } + + @Override public String toString() { + return "outputStream(" + sink + ")"; + } + }; + } + /** Returns a source that reads from {@code in}. */ public static Source source(final InputStream in) { return new Source() { @@ -85,4 +133,57 @@ public final class OkBuffers { } }; } + + /** + * Returns an input stream that reads from {@code source}. This may buffer + * data by reading extra data eagerly. + */ + public static InputStream inputStream(final Source source) { + return new InputStream() { + final OkBuffer buffer = new OkBuffer(); + + @Override public int read() throws IOException { + if (buffer.byteCount == 0) { + long count = source.read(buffer, Segment.SIZE, Deadline.NONE); + if (count == -1) return -1; + } + return buffer.readByte(); + } + + @Override public int read(byte[] data, int offset, int byteCount) throws IOException { + checkOffsetAndCount(data.length, offset, byteCount); + + if (buffer.byteCount == 0) { + long count = source.read(buffer, Segment.SIZE, Deadline.NONE); + if (count == -1) return -1; + } + + Segment head = buffer.head; + int toCopy = Math.min(byteCount, head.limit - head.pos); + System.arraycopy(head.data, head.pos, data, offset, toCopy); + + head.pos += toCopy; + buffer.byteCount -= toCopy; + + if (head.pos == head.limit) { + buffer.head = head.pop(); + SegmentPool.INSTANCE.recycle(head); + } + + return toCopy; + } + + @Override public int available() throws IOException { + return (int) Math.min(buffer.byteCount, Integer.MAX_VALUE); + } + + @Override public void close() throws IOException { + super.close(); + } + + @Override public String toString() { + return "inputStream(" + source + ")"; + } + }; + } } diff --git a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/bytes/OkBufferTest.java b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/bytes/OkBufferTest.java index ed52cb689..eedb9def7 100644 --- a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/bytes/OkBufferTest.java +++ b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/bytes/OkBufferTest.java @@ -17,7 +17,9 @@ package com.squareup.okhttp.internal.bytes; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.util.Arrays; import java.util.List; import org.junit.Test; @@ -41,7 +43,7 @@ public final class OkBufferTest { try { buffer.readUtf8(1); fail(); - } catch (IllegalArgumentException expected) { + } catch (ArrayIndexOutOfBoundsException expected) { } } @@ -292,6 +294,26 @@ public final class OkBufferTest { assertEquals("a" + repeat('b', 9998) + "c", out.toString("UTF-8")); } + @Test public void outputStreamFromSink() throws Exception { + OkBuffer sink = new OkBuffer(); + OutputStream out = OkBuffers.outputStream(sink); + out.write('a'); + out.write(repeat('b', 9998).getBytes(UTF_8)); + out.write('c'); + out.flush(); + assertEquals("a" + repeat('b', 9998) + "c", sink.readUtf8(10000)); + } + + @Test public void outputStreamFromSinkBounds() throws Exception { + OkBuffer sink = new OkBuffer(); + OutputStream out = OkBuffers.outputStream(sink); + try { + out.write(new byte[100], 50, 51); + fail(); + } catch (ArrayIndexOutOfBoundsException expected) { + } + } + @Test public void sourceFromInputStream() throws Exception { InputStream in = new ByteArrayInputStream( ("a" + repeat('b', Segment.SIZE * 2) + "c").getBytes(UTF_8)); @@ -316,6 +338,62 @@ public final class OkBufferTest { assertEquals(-1, source.read(sink, 1, Deadline.NONE)); } + @Test public void sourceFromInputStreamBounds() throws Exception { + Source source = OkBuffers.source(new ByteArrayInputStream(new byte[100])); + try { + source.read(new OkBuffer(), -1, Deadline.NONE); + fail(); + } catch (IllegalArgumentException expected) { + } + } + + @Test public void inputStreamFromSource() throws Exception { + OkBuffer source = new OkBuffer(); + source.writeUtf8("a"); + source.writeUtf8(repeat('b', Segment.SIZE)); + source.writeUtf8("c"); + + InputStream in = OkBuffers.inputStream(source); + assertEquals(0, in.available()); + assertEquals(Segment.SIZE + 2, source.byteCount()); + + // Reading one byte buffers a full segment. + assertEquals('a', in.read()); + assertEquals(Segment.SIZE - 1, in.available()); + assertEquals(2, source.byteCount()); + + // Reading as much as possible reads the rest of that buffered segment. + byte[] data = new byte[Segment.SIZE * 2]; + assertEquals(Segment.SIZE - 1, in.read(data, 0, data.length)); + assertEquals(repeat('b', Segment.SIZE - 1), new String(data, 0, Segment.SIZE - 1, UTF_8)); + assertEquals(2, source.byteCount()); + + // Continuing to read buffers the next segment. + assertEquals('b', in.read()); + assertEquals(1, in.available()); + assertEquals(0, source.byteCount()); + + // Continuing to read reads from the buffer. + assertEquals('c', in.read()); + assertEquals(0, in.available()); + assertEquals(0, source.byteCount()); + + // Once we've exhausted the source, we're done. + assertEquals(-1, in.read()); + assertEquals(0, source.byteCount()); + } + + @Test public void inputStreamFromSourceBounds() throws IOException { + OkBuffer source = new OkBuffer(); + source.writeUtf8(repeat('a', 100)); + InputStream in = OkBuffers.inputStream(source); + try { + in.read(new byte[100], 50, 51); + fail(); + } catch (ArrayIndexOutOfBoundsException expected) { + } + } + @Test public void writeBytes() throws Exception { OkBuffer data = new OkBuffer(); data.writeByte(0xab);