From 5fe67f1bfc00a1c2027802515079137f6fd70bea Mon Sep 17 00:00:00 2001 From: jwilson Date: Sun, 26 Jan 2014 23:46:30 -0500 Subject: [PATCH] Implement moving bytes between OkBuffers. This is more complicated than I'd anticipated. --- .../okhttp/internal/bytes/OkBuffer.java | 134 ++++++++++++++++-- .../okhttp/internal/bytes/Segment.java | 99 ++++++++++--- .../okhttp/internal/bytes/OkBufferTest.java | 98 ++++++++++++- 3 files changed, 295 insertions(+), 36 deletions(-) diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/OkBuffer.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/OkBuffer.java index 36162bbb6..0e68841c7 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/OkBuffer.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/OkBuffer.java @@ -17,6 +17,9 @@ package com.squareup.okhttp.internal.bytes; import com.squareup.okhttp.internal.Util; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; /** * A collection of bytes in memory. @@ -37,7 +40,7 @@ public final class OkBuffer implements Source, Sink { private static final char[] HEX_DIGITS = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' }; - private Segment segment; + private Segment head; private long byteCount; public OkBuffer() { @@ -68,14 +71,16 @@ public final class OkBuffer implements Source, Sink { byte[] result = new byte[byteCount]; while (offset < byteCount) { - int toCopy = Math.min(byteCount - offset, segment.limit - segment.pos); - System.arraycopy(segment.data, segment.pos, result, offset, toCopy); + int toCopy = Math.min(byteCount - offset, head.limit - head.pos); + System.arraycopy(head.data, head.pos, result, offset, toCopy); offset += toCopy; - segment.pos += toCopy; + head.pos += toCopy; - if (segment.pos == segment.limit) { - segment = segment.pop(); // Recycle this empty segment. + if (head.pos == head.limit) { + Segment toRecycle = head; + head = toRecycle.pop(); + SegmentPool.INSTANCE.recycle(toRecycle); } } @@ -96,14 +101,14 @@ public final class OkBuffer implements Source, Sink { private void write(byte[] data) { int offset = 0; while (offset < data.length) { - if (segment == null) { - segment = SegmentPool.INSTANCE.take(); // Acquire a first segment. - segment.next = segment.prev = segment; + if (head == null) { + head = SegmentPool.INSTANCE.take(); // Acquire a first segment. + head.next = head.prev = head; } - Segment tail = segment.prev; + Segment tail = head.prev; if (tail.limit == Segment.SIZE) { - tail = tail.push(); // Acquire a new empty segment. + tail = tail.push(SegmentPool.INSTANCE.take()); // Append a new empty segment to fill up. } int toCopy = Math.min(data.length - offset, Segment.SIZE - tail.limit); @@ -117,7 +122,95 @@ public final class OkBuffer implements Source, Sink { } @Override public void write(OkBuffer source, long byteCount, Timeout timeout) { - throw new UnsupportedOperationException(); + // Move bytes from the head of the source buffer to the tail of this buffer + // while balancing two conflicting goals: don't waste CPU and don't waste + // memory. + // + // + // Don't waste CPU (ie. don't copy data around). + // + // Copying large amounts of data is expensive. Instead, we prefer to + // reassign entire segments from one OkBuffer to the other. + // + // + // Don't waste memory. + // + // As an invariant, adjacent pairs of segments in an OkBuffer should be at + // least 50% full, except for the head segment and the tail segment. + // + // The head segment cannot maintain the invariant because the application is + // consuming bytes from this segment, decreasing its level. + // + // The tail segment cannot maintain the invariant because the application is + // producing bytes, which may require new nearly-empty tail segments to be + // appended. + // + // + // Moving segments between buffers + // + // When writing one buffer to another, we prefer to reassign entire segments + // over copying bytes into their most compact form. Suppose we have a buffer + // with these segment levels [91%, 61%]. If we append a buffer with a + // single [72%] segment, that yields [91%, 61%, 72%]. No bytes are copied. + // + // Or suppose we have a buffer with these segment levels: [100%, 2%], and we + // want to append it to a buffer with these segment levels [99%, 3%]. This + // operation will yield the following segments: [100%, 2%, 99%, 3%]. That + // is, we do not spend time copying bytes around to achieve more efficient + // memory use like [100%, 100%, 4%]. + // + // When combining buffers, we will compact adjacent buffers when their + // combined level is less than 100%. For example, when we start with [100%, + // 40%] and append [30%, 80%], the result is [100%, 70%, 80%]. + // + // + // Splitting segments + // + // Occasionally we write only part of a source buffer to a sink buffer. For + // example, given a sink [51%, 91%], we may want to write the first 30% of + // a source [92%, 82%] to it. To simplify, we first transform the source to + // an equivalent buffer [30%, 62%, 82%] and then move the head segment, + // yielding sink [51%, 91%, 30%] and source [62%, 82%]. + + if (source == this) throw new IllegalArgumentException("source == this"); + if (byteCount > source.byteCount) { + throw new IllegalArgumentException( + String.format("requested %s > available %s", byteCount, this.byteCount)); + } + + while (byteCount > 0) { + // Is a prefix of the source's head segment all that we need to move? + if (byteCount < (source.head.limit - source.head.pos)) { + Segment tail = head.prev; + if (head == null || byteCount + (tail.limit - tail.pos) > Segment.SIZE) { + // We're going to need another segment. Split the source's head + // segment in two, then move the first of those two to this buffer. + source.head = source.head.split((int) byteCount); + } else { + // Our existing segments are sufficient. Move bytes from source's head to our tail. + source.head.writeTo(tail, (int) byteCount); + source.byteCount -= byteCount; + this.byteCount += byteCount; + return; + } + } + + // Remove the source's head segment and append it to our tail. + Segment segmentToMove = source.head; + long movedByteCount = segmentToMove.limit - segmentToMove.pos; + source.head = segmentToMove.pop(); + if (head == null) { + head = segmentToMove; + head.next = head.prev = head; + } else { + Segment tail = head.prev; + tail = tail.push(segmentToMove); + tail.compact(); + } + source.byteCount -= movedByteCount; + this.byteCount += movedByteCount; + byteCount -= movedByteCount; + } } @Override public long read(OkBuffer sink, long byteCount, Timeout timeout) throws IOException { @@ -129,11 +222,22 @@ public final class OkBuffer implements Source, Sink { } @Override public void flush(Timeout timeout) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("Cannot flush() an OkBuffer"); } @Override public void close(Timeout timeout) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("Cannot close() an OkBuffer"); + } + + /** For testing. This returns the sizes of the segments in this buffer. */ + List segmentSizes() { + if (head == null) return Collections.emptyList(); + List result = new ArrayList(); + result.add(head.limit - head.pos); + for (Segment s = head.next; s != head; s = s.next) { + result.add(s.limit - s.pos); + } + return result; } /** @@ -144,7 +248,7 @@ public final class OkBuffer implements Source, Sink { if (byteCount > 0x100000) return super.toString(); char[] result = new char[(int) (byteCount * 2)]; int offset = 0; - for (Segment s = segment; offset < byteCount; s = s.next) { + for (Segment s = head; offset < byteCount; s = s.next) { for (int i = s.pos; i < s.limit; i++) { result[offset++] = HEX_DIGITS[(s.data[i] >> 4) & 0xf]; result[offset++] = HEX_DIGITS[s.data[i] & 0xf]; diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/Segment.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/Segment.java index a1e32c348..a3fe7c08e 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/Segment.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/Segment.java @@ -31,23 +31,23 @@ final class Segment { // TODO: Is 2 KiB a good default segment size? static final int SIZE = 2048; - final byte[] data; + final byte[] data = new byte[SIZE]; + + /** The next byte of application data byte to read in this segment. */ int pos; + + /** The first byte of available data ready to be written to. */ int limit; - /** Next segment in a linked list. */ + /** Next segment in a linked or circularly-linked list. */ Segment next; - /** Previous segment in a linked list. */ + /** Previous segment in a circularly-linked list. */ Segment prev; - Segment() { - data = new byte[SIZE]; - } - /** - * Removes this head of a circularly-linked list, recycles it, and returns the - * new head of the list. Returns null if the list is now empty. + * Removes this segment of a circularly-linked list and returns its successor. + * Returns null if the list is now empty. */ public Segment pop() { Segment result = next != this ? next : null; @@ -55,20 +55,81 @@ final class Segment { next.prev = prev; next = null; prev = null; - SegmentPool.INSTANCE.recycle(this); return result; } /** - * Acquires a segment and appends it to this tail of a circularly-linked list. - * Returns the new tail segment. + * Appends {@code segment} after this segment in the circularly-linked list. + * Returns the pushed segment. */ - public Segment push() { - Segment result = SegmentPool.INSTANCE.take(); - result.prev = this; - result.next = next; - next.prev = result; - next = result; - return result; + public Segment push(Segment segment) { + segment.prev = this; + segment.next = next; + next.prev = segment; + next = segment; + return segment; + } + + /** + * Splits this head of a circularly-linked list into two segments. The first + * segment contains the data in {@code [pos..pos+byteCount)}. The second + * segment contains the data in {@code [pos+byteCount..limit)}. This can be + * useful when moving partial segments from one OkBuffer to another. + * + *

Returns the new head of the circularly-linked list. + */ + public Segment split(int byteCount) { + int aSize = byteCount; + int bSize = (limit - pos) - byteCount; + if (aSize <= 0 || bSize <= 0) throw new IllegalArgumentException(); + + // Which side of the split is larger? We want to copy as few bytes as possible. + if (aSize < bSize) { + // Create a segment of size 'aSize' before this segment. + Segment before = SegmentPool.INSTANCE.take(); + System.arraycopy(data, pos, before.data, before.pos, aSize); + pos += aSize; + before.limit += aSize; + prev.push(before); + return before; + } else { + // Create a new segment of size 'bSize' after this segment. + Segment after = SegmentPool.INSTANCE.take(); + System.arraycopy(data, pos + aSize, after.data, after.pos, bSize); + limit -= bSize; + after.limit += bSize; + push(after); + return this; + } + } + + /** + * Call this when the tail and its predecessor may both be less than half + * full. This will copy data so that segments can be recycled. + */ + public void compact() { + if (prev == this) throw new IllegalStateException(); + if ((prev.limit - prev.pos) + (limit - pos) > SIZE) return; // Cannot compact. + writeTo(prev, limit - pos); + pop(); + SegmentPool.INSTANCE.recycle(this); + } + + /** Moves {@code byteCount} bytes from {@code sink} to this segment. */ + // TODO: if sink has fewer bytes than this, it may be cheaper to reverse the + // direction of the copy and swap the segments! + public void writeTo(Segment sink, int byteCount) { + if (byteCount + (sink.limit - sink.pos) > SIZE) throw new IllegalArgumentException(); + + if (sink.limit + byteCount > SIZE) { + // We can't fit byteCount bytes at the sink's current position. Compact sink first. + System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos); + sink.limit -= sink.pos; + sink.pos = 0; + } + + System.arraycopy(data, pos, sink.data, sink.limit, byteCount); + sink.limit += byteCount; + pos += byteCount; } } diff --git a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/bytes/OkBufferTest.java b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/bytes/OkBufferTest.java index 3adbfbfb6..5e47e08d8 100644 --- a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/bytes/OkBufferTest.java +++ b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/bytes/OkBufferTest.java @@ -16,8 +16,10 @@ package com.squareup.okhttp.internal.bytes; import java.util.Arrays; +import java.util.List; import org.junit.Test; +import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -48,8 +50,8 @@ public final class OkBufferTest { @Test public void multipleSegmentBuffers() throws Exception { OkBuffer buffer = new OkBuffer(); buffer.writeUtf8(repeat('a', 1000)); - buffer.writeUtf8(repeat('b', 2500)); - buffer.writeUtf8(repeat('c', 5000)); + buffer.writeUtf8(repeat('b', 2500)); + buffer.writeUtf8(repeat('c', 5000)); buffer.writeUtf8(repeat('d', 10000)); buffer.writeUtf8(repeat('e', 25000)); buffer.writeUtf8(repeat('f', 50000)); @@ -88,6 +90,98 @@ public final class OkBufferTest { assertEquals(0, SegmentPool.INSTANCE.byteCount); } + @Test public void moveBytesBetweenBuffersShareSegment() throws Exception { + int size = (Segment.SIZE / 2) - 1; + List segmentSizes = moveBytesBetweenBuffers(repeat('a', size), repeat('b', size)); + assertEquals(asList(size * 2), segmentSizes); + } + + @Test public void moveBytesBetweenBuffersReassignSegment() throws Exception { + int size = (Segment.SIZE / 2) + 1; + List segmentSizes = moveBytesBetweenBuffers(repeat('a', size), repeat('b', size)); + assertEquals(asList(size, size), segmentSizes); + } + + @Test public void moveBytesBetweenBuffersMultipleSegments() throws Exception { + int size = 3 * Segment.SIZE + 1; + List segmentSizes = moveBytesBetweenBuffers(repeat('a', size), repeat('b', size)); + assertEquals(asList(Segment.SIZE, Segment.SIZE, Segment.SIZE, 1, + Segment.SIZE, Segment.SIZE, Segment.SIZE, 1), segmentSizes); + } + + private List moveBytesBetweenBuffers(String... contents) { + StringBuilder expected = new StringBuilder(); + OkBuffer buffer = new OkBuffer(); + for (String s : contents) { + OkBuffer source = new OkBuffer(); + source.writeUtf8(s); + buffer.write(source, source.byteCount(), Timeout.NONE); + expected.append(s); + } + List segmentSizes = buffer.segmentSizes(); + assertEquals(expected.toString(), buffer.readUtf8(expected.length())); + return segmentSizes; + } + + /** The big part of source's first segment is being moved. */ + @Test public void writeSplitSourceBufferLeft() throws Exception { + int writeSize = Segment.SIZE / 2 + 1; + + OkBuffer sink = new OkBuffer(); + sink.writeUtf8(repeat('b', Segment.SIZE - 10)); + + OkBuffer source = new OkBuffer(); + source.writeUtf8(repeat('a', Segment.SIZE * 2)); + sink.write(source, writeSize, Timeout.NONE); + + assertEquals(asList(Segment.SIZE - 10, writeSize), sink.segmentSizes()); + assertEquals(asList(Segment.SIZE - writeSize, Segment.SIZE), source.segmentSizes()); + } + + /** The big part of source's first segment is staying put. */ + @Test public void writeSplitSourceBufferRight() throws Exception { + int writeSize = Segment.SIZE / 2 - 1; + + OkBuffer sink = new OkBuffer(); + sink.writeUtf8(repeat('b', Segment.SIZE - 10)); + + OkBuffer source = new OkBuffer(); + source.writeUtf8(repeat('a', Segment.SIZE * 2)); + sink.write(source, writeSize, Timeout.NONE); + + assertEquals(asList(Segment.SIZE - 10, writeSize), sink.segmentSizes()); + assertEquals(asList(Segment.SIZE - writeSize, Segment.SIZE), source.segmentSizes()); + } + + @Test public void writePrefixDoesntSplit() throws Exception { + OkBuffer sink = new OkBuffer(); + sink.writeUtf8(repeat('b', 10)); + + OkBuffer source = new OkBuffer(); + source.writeUtf8(repeat('a', Segment.SIZE * 2)); + sink.write(source, 20, Timeout.NONE); + + assertEquals(asList(30), sink.segmentSizes()); + assertEquals(asList(Segment.SIZE - 20, Segment.SIZE), source.segmentSizes()); + assertEquals(30, sink.byteCount()); + assertEquals(Segment.SIZE * 2 - 20, source.byteCount()); + } + + @Test public void writePrefixDoesntSplitButRequiresCompact() throws Exception { + OkBuffer sink = new OkBuffer(); + sink.writeUtf8(repeat('b', Segment.SIZE - 10)); // limit = size - 10 + sink.readUtf8(Segment.SIZE - 20); // pos = size = 20 + + OkBuffer source = new OkBuffer(); + source.writeUtf8(repeat('a', Segment.SIZE * 2)); + sink.write(source, 20, Timeout.NONE); + + assertEquals(asList(30), sink.segmentSizes()); + assertEquals(asList(Segment.SIZE - 20, Segment.SIZE), source.segmentSizes()); + assertEquals(30, sink.byteCount()); + assertEquals(Segment.SIZE * 2 - 20, source.byteCount()); + } + private String repeat(char c, int count) { char[] array = new char[count]; Arrays.fill(array, c);