1
0
mirror of https://github.com/square/okhttp.git synced 2026-01-24 04:02:07 +03:00

Merge pull request #487 from square/jwilson_0126_buffer_to_buffer

Implement moving bytes between OkBuffers.
This commit is contained in:
Jesse Wilson
2014-01-26 21:49:04 -08:00
3 changed files with 295 additions and 36 deletions

View File

@@ -17,6 +17,9 @@ package com.squareup.okhttp.internal.bytes;
import com.squareup.okhttp.internal.Util;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* A collection of bytes in memory.
@@ -37,7 +40,7 @@ public final class OkBuffer implements Source, Sink {
private static final char[] HEX_DIGITS =
{ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };
private Segment segment;
private Segment head;
private long byteCount;
public OkBuffer() {
@@ -68,14 +71,16 @@ public final class OkBuffer implements Source, Sink {
byte[] result = new byte[byteCount];
while (offset < byteCount) {
int toCopy = Math.min(byteCount - offset, segment.limit - segment.pos);
System.arraycopy(segment.data, segment.pos, result, offset, toCopy);
int toCopy = Math.min(byteCount - offset, head.limit - head.pos);
System.arraycopy(head.data, head.pos, result, offset, toCopy);
offset += toCopy;
segment.pos += toCopy;
head.pos += toCopy;
if (segment.pos == segment.limit) {
segment = segment.pop(); // Recycle this empty segment.
if (head.pos == head.limit) {
Segment toRecycle = head;
head = toRecycle.pop();
SegmentPool.INSTANCE.recycle(toRecycle);
}
}
@@ -96,14 +101,14 @@ public final class OkBuffer implements Source, Sink {
private void write(byte[] data) {
int offset = 0;
while (offset < data.length) {
if (segment == null) {
segment = SegmentPool.INSTANCE.take(); // Acquire a first segment.
segment.next = segment.prev = segment;
if (head == null) {
head = SegmentPool.INSTANCE.take(); // Acquire a first segment.
head.next = head.prev = head;
}
Segment tail = segment.prev;
Segment tail = head.prev;
if (tail.limit == Segment.SIZE) {
tail = tail.push(); // Acquire a new empty segment.
tail = tail.push(SegmentPool.INSTANCE.take()); // Append a new empty segment to fill up.
}
int toCopy = Math.min(data.length - offset, Segment.SIZE - tail.limit);
@@ -117,7 +122,95 @@ public final class OkBuffer implements Source, Sink {
}
@Override public void write(OkBuffer source, long byteCount, Timeout timeout) {
throw new UnsupportedOperationException();
// Move bytes from the head of the source buffer to the tail of this buffer
// while balancing two conflicting goals: don't waste CPU and don't waste
// memory.
//
//
// Don't waste CPU (ie. don't copy data around).
//
// Copying large amounts of data is expensive. Instead, we prefer to
// reassign entire segments from one OkBuffer to the other.
//
//
// Don't waste memory.
//
// As an invariant, adjacent pairs of segments in an OkBuffer should be at
// least 50% full, except for the head segment and the tail segment.
//
// The head segment cannot maintain the invariant because the application is
// consuming bytes from this segment, decreasing its level.
//
// The tail segment cannot maintain the invariant because the application is
// producing bytes, which may require new nearly-empty tail segments to be
// appended.
//
//
// Moving segments between buffers
//
// When writing one buffer to another, we prefer to reassign entire segments
// over copying bytes into their most compact form. Suppose we have a buffer
// with these segment levels [91%, 61%]. If we append a buffer with a
// single [72%] segment, that yields [91%, 61%, 72%]. No bytes are copied.
//
// Or suppose we have a buffer with these segment levels: [100%, 2%], and we
// want to append it to a buffer with these segment levels [99%, 3%]. This
// operation will yield the following segments: [100%, 2%, 99%, 3%]. That
// is, we do not spend time copying bytes around to achieve more efficient
// memory use like [100%, 100%, 4%].
//
// When combining buffers, we will compact adjacent buffers when their
// combined level is less than 100%. For example, when we start with [100%,
// 40%] and append [30%, 80%], the result is [100%, 70%, 80%].
//
//
// Splitting segments
//
// Occasionally we write only part of a source buffer to a sink buffer. For
// example, given a sink [51%, 91%], we may want to write the first 30% of
// a source [92%, 82%] to it. To simplify, we first transform the source to
// an equivalent buffer [30%, 62%, 82%] and then move the head segment,
// yielding sink [51%, 91%, 30%] and source [62%, 82%].
if (source == this) throw new IllegalArgumentException("source == this");
if (byteCount > source.byteCount) {
throw new IllegalArgumentException(
String.format("requested %s > available %s", byteCount, this.byteCount));
}
while (byteCount > 0) {
// Is a prefix of the source's head segment all that we need to move?
if (byteCount < (source.head.limit - source.head.pos)) {
Segment tail = head.prev;
if (head == null || byteCount + (tail.limit - tail.pos) > Segment.SIZE) {
// We're going to need another segment. Split the source's head
// segment in two, then move the first of those two to this buffer.
source.head = source.head.split((int) byteCount);
} else {
// Our existing segments are sufficient. Move bytes from source's head to our tail.
source.head.writeTo(tail, (int) byteCount);
source.byteCount -= byteCount;
this.byteCount += byteCount;
return;
}
}
// Remove the source's head segment and append it to our tail.
Segment segmentToMove = source.head;
long movedByteCount = segmentToMove.limit - segmentToMove.pos;
source.head = segmentToMove.pop();
if (head == null) {
head = segmentToMove;
head.next = head.prev = head;
} else {
Segment tail = head.prev;
tail = tail.push(segmentToMove);
tail.compact();
}
source.byteCount -= movedByteCount;
this.byteCount += movedByteCount;
byteCount -= movedByteCount;
}
}
@Override public long read(OkBuffer sink, long byteCount, Timeout timeout) throws IOException {
@@ -129,11 +222,22 @@ public final class OkBuffer implements Source, Sink {
}
@Override public void flush(Timeout timeout) {
throw new UnsupportedOperationException();
throw new UnsupportedOperationException("Cannot flush() an OkBuffer");
}
@Override public void close(Timeout timeout) {
throw new UnsupportedOperationException();
throw new UnsupportedOperationException("Cannot close() an OkBuffer");
}
/** For testing. This returns the sizes of the segments in this buffer. */
List<Integer> segmentSizes() {
if (head == null) return Collections.emptyList();
List<Integer> result = new ArrayList<Integer>();
result.add(head.limit - head.pos);
for (Segment s = head.next; s != head; s = s.next) {
result.add(s.limit - s.pos);
}
return result;
}
/**
@@ -144,7 +248,7 @@ public final class OkBuffer implements Source, Sink {
if (byteCount > 0x100000) return super.toString();
char[] result = new char[(int) (byteCount * 2)];
int offset = 0;
for (Segment s = segment; offset < byteCount; s = s.next) {
for (Segment s = head; offset < byteCount; s = s.next) {
for (int i = s.pos; i < s.limit; i++) {
result[offset++] = HEX_DIGITS[(s.data[i] >> 4) & 0xf];
result[offset++] = HEX_DIGITS[s.data[i] & 0xf];

View File

@@ -31,23 +31,23 @@ final class Segment {
// TODO: Is 2 KiB a good default segment size?
static final int SIZE = 2048;
final byte[] data;
final byte[] data = new byte[SIZE];
/** The next byte of application data byte to read in this segment. */
int pos;
/** The first byte of available data ready to be written to. */
int limit;
/** Next segment in a linked list. */
/** Next segment in a linked or circularly-linked list. */
Segment next;
/** Previous segment in a linked list. */
/** Previous segment in a circularly-linked list. */
Segment prev;
Segment() {
data = new byte[SIZE];
}
/**
* Removes this head of a circularly-linked list, recycles it, and returns the
* new head of the list. Returns null if the list is now empty.
* Removes this segment of a circularly-linked list and returns its successor.
* Returns null if the list is now empty.
*/
public Segment pop() {
Segment result = next != this ? next : null;
@@ -55,20 +55,81 @@ final class Segment {
next.prev = prev;
next = null;
prev = null;
SegmentPool.INSTANCE.recycle(this);
return result;
}
/**
* Acquires a segment and appends it to this tail of a circularly-linked list.
* Returns the new tail segment.
* Appends {@code segment} after this segment in the circularly-linked list.
* Returns the pushed segment.
*/
public Segment push() {
Segment result = SegmentPool.INSTANCE.take();
result.prev = this;
result.next = next;
next.prev = result;
next = result;
return result;
public Segment push(Segment segment) {
segment.prev = this;
segment.next = next;
next.prev = segment;
next = segment;
return segment;
}
/**
* Splits this head of a circularly-linked list into two segments. The first
* segment contains the data in {@code [pos..pos+byteCount)}. The second
* segment contains the data in {@code [pos+byteCount..limit)}. This can be
* useful when moving partial segments from one OkBuffer to another.
*
* <p>Returns the new head of the circularly-linked list.
*/
public Segment split(int byteCount) {
int aSize = byteCount;
int bSize = (limit - pos) - byteCount;
if (aSize <= 0 || bSize <= 0) throw new IllegalArgumentException();
// Which side of the split is larger? We want to copy as few bytes as possible.
if (aSize < bSize) {
// Create a segment of size 'aSize' before this segment.
Segment before = SegmentPool.INSTANCE.take();
System.arraycopy(data, pos, before.data, before.pos, aSize);
pos += aSize;
before.limit += aSize;
prev.push(before);
return before;
} else {
// Create a new segment of size 'bSize' after this segment.
Segment after = SegmentPool.INSTANCE.take();
System.arraycopy(data, pos + aSize, after.data, after.pos, bSize);
limit -= bSize;
after.limit += bSize;
push(after);
return this;
}
}
/**
* Call this when the tail and its predecessor may both be less than half
* full. This will copy data so that segments can be recycled.
*/
public void compact() {
if (prev == this) throw new IllegalStateException();
if ((prev.limit - prev.pos) + (limit - pos) > SIZE) return; // Cannot compact.
writeTo(prev, limit - pos);
pop();
SegmentPool.INSTANCE.recycle(this);
}
/** Moves {@code byteCount} bytes from {@code sink} to this segment. */
// TODO: if sink has fewer bytes than this, it may be cheaper to reverse the
// direction of the copy and swap the segments!
public void writeTo(Segment sink, int byteCount) {
if (byteCount + (sink.limit - sink.pos) > SIZE) throw new IllegalArgumentException();
if (sink.limit + byteCount > SIZE) {
// We can't fit byteCount bytes at the sink's current position. Compact sink first.
System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
sink.limit -= sink.pos;
sink.pos = 0;
}
System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
sink.limit += byteCount;
pos += byteCount;
}
}

View File

@@ -16,8 +16,10 @@
package com.squareup.okhttp.internal.bytes;
import java.util.Arrays;
import java.util.List;
import org.junit.Test;
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -48,8 +50,8 @@ public final class OkBufferTest {
@Test public void multipleSegmentBuffers() throws Exception {
OkBuffer buffer = new OkBuffer();
buffer.writeUtf8(repeat('a', 1000));
buffer.writeUtf8(repeat('b', 2500));
buffer.writeUtf8(repeat('c', 5000));
buffer.writeUtf8(repeat('b', 2500));
buffer.writeUtf8(repeat('c', 5000));
buffer.writeUtf8(repeat('d', 10000));
buffer.writeUtf8(repeat('e', 25000));
buffer.writeUtf8(repeat('f', 50000));
@@ -88,6 +90,98 @@ public final class OkBufferTest {
assertEquals(0, SegmentPool.INSTANCE.byteCount);
}
@Test public void moveBytesBetweenBuffersShareSegment() throws Exception {
int size = (Segment.SIZE / 2) - 1;
List<Integer> segmentSizes = moveBytesBetweenBuffers(repeat('a', size), repeat('b', size));
assertEquals(asList(size * 2), segmentSizes);
}
@Test public void moveBytesBetweenBuffersReassignSegment() throws Exception {
int size = (Segment.SIZE / 2) + 1;
List<Integer> segmentSizes = moveBytesBetweenBuffers(repeat('a', size), repeat('b', size));
assertEquals(asList(size, size), segmentSizes);
}
@Test public void moveBytesBetweenBuffersMultipleSegments() throws Exception {
int size = 3 * Segment.SIZE + 1;
List<Integer> segmentSizes = moveBytesBetweenBuffers(repeat('a', size), repeat('b', size));
assertEquals(asList(Segment.SIZE, Segment.SIZE, Segment.SIZE, 1,
Segment.SIZE, Segment.SIZE, Segment.SIZE, 1), segmentSizes);
}
private List<Integer> moveBytesBetweenBuffers(String... contents) {
StringBuilder expected = new StringBuilder();
OkBuffer buffer = new OkBuffer();
for (String s : contents) {
OkBuffer source = new OkBuffer();
source.writeUtf8(s);
buffer.write(source, source.byteCount(), Timeout.NONE);
expected.append(s);
}
List<Integer> segmentSizes = buffer.segmentSizes();
assertEquals(expected.toString(), buffer.readUtf8(expected.length()));
return segmentSizes;
}
/** The big part of source's first segment is being moved. */
@Test public void writeSplitSourceBufferLeft() throws Exception {
int writeSize = Segment.SIZE / 2 + 1;
OkBuffer sink = new OkBuffer();
sink.writeUtf8(repeat('b', Segment.SIZE - 10));
OkBuffer source = new OkBuffer();
source.writeUtf8(repeat('a', Segment.SIZE * 2));
sink.write(source, writeSize, Timeout.NONE);
assertEquals(asList(Segment.SIZE - 10, writeSize), sink.segmentSizes());
assertEquals(asList(Segment.SIZE - writeSize, Segment.SIZE), source.segmentSizes());
}
/** The big part of source's first segment is staying put. */
@Test public void writeSplitSourceBufferRight() throws Exception {
int writeSize = Segment.SIZE / 2 - 1;
OkBuffer sink = new OkBuffer();
sink.writeUtf8(repeat('b', Segment.SIZE - 10));
OkBuffer source = new OkBuffer();
source.writeUtf8(repeat('a', Segment.SIZE * 2));
sink.write(source, writeSize, Timeout.NONE);
assertEquals(asList(Segment.SIZE - 10, writeSize), sink.segmentSizes());
assertEquals(asList(Segment.SIZE - writeSize, Segment.SIZE), source.segmentSizes());
}
@Test public void writePrefixDoesntSplit() throws Exception {
OkBuffer sink = new OkBuffer();
sink.writeUtf8(repeat('b', 10));
OkBuffer source = new OkBuffer();
source.writeUtf8(repeat('a', Segment.SIZE * 2));
sink.write(source, 20, Timeout.NONE);
assertEquals(asList(30), sink.segmentSizes());
assertEquals(asList(Segment.SIZE - 20, Segment.SIZE), source.segmentSizes());
assertEquals(30, sink.byteCount());
assertEquals(Segment.SIZE * 2 - 20, source.byteCount());
}
@Test public void writePrefixDoesntSplitButRequiresCompact() throws Exception {
OkBuffer sink = new OkBuffer();
sink.writeUtf8(repeat('b', Segment.SIZE - 10)); // limit = size - 10
sink.readUtf8(Segment.SIZE - 20); // pos = size = 20
OkBuffer source = new OkBuffer();
source.writeUtf8(repeat('a', Segment.SIZE * 2));
sink.write(source, 20, Timeout.NONE);
assertEquals(asList(30), sink.segmentSizes());
assertEquals(asList(Segment.SIZE - 20, Segment.SIZE), source.segmentSizes());
assertEquals(30, sink.byteCount());
assertEquals(Segment.SIZE * 2 - 20, source.byteCount());
}
private String repeat(char c, int count) {
char[] array = new char[count];
Arrays.fill(array, c);