From bcacc38f7ddd87adce636610c6c3b9d4d2b29873 Mon Sep 17 00:00:00 2001 From: jwilson Date: Mon, 27 Jan 2014 21:38:55 -0500 Subject: [PATCH] More buffer stuff: deadlines, indexOf and read. Rename Timeout to Deadline. I'm still unsatisfied with this API, but at least the new name is accurate. Implement indexOf as a linear scan and read by delegating to write. --- .../bytes/{Timeout.java => Deadline.java} | 14 +-- .../okhttp/internal/bytes/OkBuffer.java | 33 ++++-- .../squareup/okhttp/internal/bytes/Sink.java | 6 +- .../okhttp/internal/bytes/Source.java | 6 +- .../okhttp/internal/bytes/OkBufferTest.java | 102 +++++++++++++++++- 5 files changed, 134 insertions(+), 27 deletions(-) rename okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/{Timeout.java => Deadline.java} (74%) diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/Timeout.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/Deadline.java similarity index 74% rename from okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/Timeout.java rename to okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/Deadline.java index d359c1c7a..ed8572a4f 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/Timeout.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/Deadline.java @@ -18,12 +18,12 @@ package com.squareup.okhttp.internal.bytes; import java.util.concurrent.TimeUnit; /** - * The deadline for a requested operation. If the timeout elapses before the - * operation has completed, the operation should be aborted. + * The time that a requested operation is due. If the deadline is reached before + * the operation has completed, the operation should be aborted. */ -public class Timeout { - public static final Timeout NONE = new Timeout() { - @Override public Timeout start(long timeout, TimeUnit unit) { +public class Deadline { + public static final Deadline NONE = new Deadline() { + @Override public Deadline start(long timeout, TimeUnit unit) { throw new UnsupportedOperationException(); } @@ -34,10 +34,10 @@ public class Timeout { private long deadlineNanos; - public Timeout() { + public Deadline() { } - public Timeout start(long timeout, TimeUnit unit) { + public Deadline start(long timeout, TimeUnit unit) { deadlineNanos = System.nanoTime() + unit.toNanos(timeout); return this; } diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/OkBuffer.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/OkBuffer.java index 0e68841c7..f07c1bc84 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/OkBuffer.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/OkBuffer.java @@ -121,7 +121,7 @@ public final class OkBuffer implements Source, Sink { this.byteCount += data.length; } - @Override public void write(OkBuffer source, long byteCount, Timeout timeout) { + @Override public void write(OkBuffer source, long byteCount, Deadline deadline) { // Move bytes from the head of the source buffer to the tail of this buffer // while balancing two conflicting goals: don't waste CPU and don't waste // memory. @@ -160,8 +160,8 @@ public final class OkBuffer implements Source, Sink { // memory use like [100%, 100%, 4%]. // // When combining buffers, we will compact adjacent buffers when their - // combined level is less than 100%. For example, when we start with [100%, - // 40%] and append [30%, 80%], the result is [100%, 70%, 80%]. + // combined level doesn't exceed 100%. For example, when we start with + // [100%, 40%] and append [30%, 80%], the result is [100%, 70%, 80%]. // // // Splitting segments @@ -213,19 +213,34 @@ public final class OkBuffer implements Source, Sink { } } - @Override public long read(OkBuffer sink, long byteCount, Timeout timeout) throws IOException { - throw new UnsupportedOperationException(); + @Override public long read(OkBuffer sink, long byteCount, Deadline deadline) throws IOException { + if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount); + if (this.byteCount == 0) return -1L; + if (byteCount > this.byteCount) byteCount = this.byteCount; + sink.write(this, byteCount, deadline); + return byteCount; } - @Override public long indexOf(byte b, Timeout timeout) throws IOException { - throw new UnsupportedOperationException(); + @Override public long indexOf(byte b, Deadline deadline) throws IOException { + Segment s = head; + if (s == null) return -1L; + long offset = 0L; + do { + byte[] data = s.data; + for (int pos = s.pos, limit = s.limit; pos < limit; pos++) { + if (data[pos] == b) return offset + pos - s.pos; + } + offset += s.limit - s.pos; + s = s.next; + } while (s != head); + return -1L; } - @Override public void flush(Timeout timeout) { + @Override public void flush(Deadline deadline) { throw new UnsupportedOperationException("Cannot flush() an OkBuffer"); } - @Override public void close(Timeout timeout) { + @Override public void close(Deadline deadline) { throw new UnsupportedOperationException("Cannot close() an OkBuffer"); } diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/Sink.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/Sink.java index 1dfca4b63..1531366be 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/Sink.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/Sink.java @@ -22,15 +22,15 @@ import java.io.IOException; */ public interface Sink { /** Removes {@code byteCount} bytes from {@code source} and appends them to this. */ - void write(OkBuffer source, long byteCount, Timeout timeout) throws IOException; + void write(OkBuffer source, long byteCount, Deadline deadline) throws IOException; /** Pushes all buffered bytes to their final destination. */ - void flush(Timeout timeout) throws IOException; + void flush(Deadline deadline) throws IOException; /** * Pushes all buffered bytes to their final destination and releases the * resources held by this sink. It is an error to write a closed sink. It is * safe to close a sink more than once. */ - void close(Timeout timeout) throws IOException; + void close(Deadline deadline) throws IOException; } diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/Source.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/Source.java index 985cab8e3..233cd5875 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/Source.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/Source.java @@ -25,17 +25,17 @@ public interface Source { * Removes {@code byteCount} bytes from this and appends them to {@code sink}. * Returns the number of bytes actually written. */ - long read(OkBuffer sink, long byteCount, Timeout timeout) throws IOException; + long read(OkBuffer sink, long byteCount, Deadline deadline) throws IOException; /** * Returns the index of {@code b} in this, or -1 if this source is exhausted * first. This may cause this source to buffer a large number of bytes. */ - long indexOf(byte b, Timeout timeout) throws IOException; + long indexOf(byte b, Deadline deadline) throws IOException; /** * Closes this source and releases the resources held by this source. It is an * error to read a closed source. It is safe to close a source more than once. */ - void close(Timeout timeout) throws IOException; + void close(Deadline deadline) throws IOException; } diff --git a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/bytes/OkBufferTest.java b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/bytes/OkBufferTest.java index 5e47e08d8..fbb1e97e3 100644 --- a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/bytes/OkBufferTest.java +++ b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/bytes/OkBufferTest.java @@ -115,7 +115,7 @@ public final class OkBufferTest { for (String s : contents) { OkBuffer source = new OkBuffer(); source.writeUtf8(s); - buffer.write(source, source.byteCount(), Timeout.NONE); + buffer.write(source, source.byteCount(), Deadline.NONE); expected.append(s); } List segmentSizes = buffer.segmentSizes(); @@ -132,7 +132,7 @@ public final class OkBufferTest { OkBuffer source = new OkBuffer(); source.writeUtf8(repeat('a', Segment.SIZE * 2)); - sink.write(source, writeSize, Timeout.NONE); + sink.write(source, writeSize, Deadline.NONE); assertEquals(asList(Segment.SIZE - 10, writeSize), sink.segmentSizes()); assertEquals(asList(Segment.SIZE - writeSize, Segment.SIZE), source.segmentSizes()); @@ -147,7 +147,7 @@ public final class OkBufferTest { OkBuffer source = new OkBuffer(); source.writeUtf8(repeat('a', Segment.SIZE * 2)); - sink.write(source, writeSize, Timeout.NONE); + sink.write(source, writeSize, Deadline.NONE); assertEquals(asList(Segment.SIZE - 10, writeSize), sink.segmentSizes()); assertEquals(asList(Segment.SIZE - writeSize, Segment.SIZE), source.segmentSizes()); @@ -159,7 +159,7 @@ public final class OkBufferTest { OkBuffer source = new OkBuffer(); source.writeUtf8(repeat('a', Segment.SIZE * 2)); - sink.write(source, 20, Timeout.NONE); + sink.write(source, 20, Deadline.NONE); assertEquals(asList(30), sink.segmentSizes()); assertEquals(asList(Segment.SIZE - 20, Segment.SIZE), source.segmentSizes()); @@ -174,7 +174,7 @@ public final class OkBufferTest { OkBuffer source = new OkBuffer(); source.writeUtf8(repeat('a', Segment.SIZE * 2)); - sink.write(source, 20, Timeout.NONE); + sink.write(source, 20, Deadline.NONE); assertEquals(asList(30), sink.segmentSizes()); assertEquals(asList(Segment.SIZE - 20, Segment.SIZE), source.segmentSizes()); @@ -182,6 +182,98 @@ public final class OkBufferTest { assertEquals(Segment.SIZE * 2 - 20, source.byteCount()); } + @Test public void readExhaustedSource() throws Exception { + OkBuffer sink = new OkBuffer(); + sink.writeUtf8(repeat('a', 10)); + + OkBuffer source = new OkBuffer(); + + assertEquals(-1, source.read(sink, 10, Deadline.NONE)); + assertEquals(10, sink.byteCount()); + assertEquals(0, source.byteCount()); + } + + @Test public void readZeroBytesFromSource() throws Exception { + OkBuffer sink = new OkBuffer(); + sink.writeUtf8(repeat('a', 10)); + + OkBuffer source = new OkBuffer(); + + // Either 0 or -1 is reasonable here. For consistency with Android's + // ByteArrayInputStream we return 0. + assertEquals(-1, source.read(sink, 0, Deadline.NONE)); + assertEquals(10, sink.byteCount()); + assertEquals(0, source.byteCount()); + } + + @Test public void moveAllRequestedBytesWithRead() throws Exception { + OkBuffer sink = new OkBuffer(); + sink.writeUtf8(repeat('a', 10)); + + OkBuffer source = new OkBuffer(); + source.writeUtf8(repeat('b', 15)); + + assertEquals(10, source.read(sink, 10, Deadline.NONE)); + assertEquals(20, sink.byteCount()); + assertEquals(5, source.byteCount()); + assertEquals(repeat('a', 10) + repeat('b', 10), sink.readUtf8(20)); + } + + @Test public void moveFewerThanRequestedBytesWithRead() throws Exception { + OkBuffer sink = new OkBuffer(); + sink.writeUtf8(repeat('a', 10)); + + OkBuffer source = new OkBuffer(); + source.writeUtf8(repeat('b', 20)); + + assertEquals(20, source.read(sink, 25, Deadline.NONE)); + assertEquals(30, sink.byteCount()); + assertEquals(0, source.byteCount()); + assertEquals(repeat('a', 10) + repeat('b', 20), sink.readUtf8(30)); + } + + @Test public void indexOf() throws Exception { + OkBuffer buffer = new OkBuffer(); + + // The segment is empty. + assertEquals(-1, buffer.indexOf((byte) 'a', Deadline.NONE)); + + // The segment has one value. + buffer.writeUtf8("a"); // a + assertEquals(0, buffer.indexOf((byte) 'a', Deadline.NONE)); + assertEquals(-1, buffer.indexOf((byte) 'b', Deadline.NONE)); + + // The segment has lots of data. + buffer.writeUtf8(repeat('b', Segment.SIZE - 2)); // ab...b + assertEquals(0, buffer.indexOf((byte) 'a', Deadline.NONE)); + assertEquals(1, buffer.indexOf((byte) 'b', Deadline.NONE)); + assertEquals(-1, buffer.indexOf((byte) 'c', Deadline.NONE)); + + // The segment doesn't start at 0, it starts at 2. + buffer.readUtf8(2); // b...b + assertEquals(-1, buffer.indexOf((byte) 'a', Deadline.NONE)); + assertEquals(0, buffer.indexOf((byte) 'b', Deadline.NONE)); + assertEquals(-1, buffer.indexOf((byte) 'c', Deadline.NONE)); + + // The segment is full. + buffer.writeUtf8("c"); // b...bc + assertEquals(-1, buffer.indexOf((byte) 'a', Deadline.NONE)); + assertEquals(0, buffer.indexOf((byte) 'b', Deadline.NONE)); + assertEquals(Segment.SIZE - 3, buffer.indexOf((byte) 'c', Deadline.NONE)); + + // The segment doesn't start at 2, it starts at 4. + buffer.readUtf8(2); // b...bc + assertEquals(-1, buffer.indexOf((byte) 'a', Deadline.NONE)); + assertEquals(0, buffer.indexOf((byte) 'b', Deadline.NONE)); + assertEquals(Segment.SIZE - 5, buffer.indexOf((byte) 'c', Deadline.NONE)); + + // Two segments. + buffer.writeUtf8("d"); // b...bcd, d is in the 2nd segment. + assertEquals(asList(Segment.SIZE - 4, 1), buffer.segmentSizes()); + assertEquals(Segment.SIZE - 4, buffer.indexOf((byte) 'd', Deadline.NONE)); + assertEquals(-1, buffer.indexOf((byte) 'e', Deadline.NONE)); + } + private String repeat(char c, int count) { char[] array = new char[count]; Arrays.fill(array, c);