mirror of
https://github.com/square/okhttp.git
synced 2026-01-24 04:02:07 +03:00
Implement moving bytes between OkBuffers.
This is more complicated than I'd anticipated.
This commit is contained in:
@@ -17,6 +17,9 @@ package com.squareup.okhttp.internal.bytes;
|
||||
|
||||
import com.squareup.okhttp.internal.Util;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* A collection of bytes in memory.
|
||||
@@ -37,7 +40,7 @@ public final class OkBuffer implements Source, Sink {
|
||||
private static final char[] HEX_DIGITS =
|
||||
{ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };
|
||||
|
||||
private Segment segment;
|
||||
private Segment head;
|
||||
private long byteCount;
|
||||
|
||||
public OkBuffer() {
|
||||
@@ -68,14 +71,16 @@ public final class OkBuffer implements Source, Sink {
|
||||
byte[] result = new byte[byteCount];
|
||||
|
||||
while (offset < byteCount) {
|
||||
int toCopy = Math.min(byteCount - offset, segment.limit - segment.pos);
|
||||
System.arraycopy(segment.data, segment.pos, result, offset, toCopy);
|
||||
int toCopy = Math.min(byteCount - offset, head.limit - head.pos);
|
||||
System.arraycopy(head.data, head.pos, result, offset, toCopy);
|
||||
|
||||
offset += toCopy;
|
||||
segment.pos += toCopy;
|
||||
head.pos += toCopy;
|
||||
|
||||
if (segment.pos == segment.limit) {
|
||||
segment = segment.pop(); // Recycle this empty segment.
|
||||
if (head.pos == head.limit) {
|
||||
Segment toRecycle = head;
|
||||
head = toRecycle.pop();
|
||||
SegmentPool.INSTANCE.recycle(toRecycle);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -96,14 +101,14 @@ public final class OkBuffer implements Source, Sink {
|
||||
private void write(byte[] data) {
|
||||
int offset = 0;
|
||||
while (offset < data.length) {
|
||||
if (segment == null) {
|
||||
segment = SegmentPool.INSTANCE.take(); // Acquire a first segment.
|
||||
segment.next = segment.prev = segment;
|
||||
if (head == null) {
|
||||
head = SegmentPool.INSTANCE.take(); // Acquire a first segment.
|
||||
head.next = head.prev = head;
|
||||
}
|
||||
|
||||
Segment tail = segment.prev;
|
||||
Segment tail = head.prev;
|
||||
if (tail.limit == Segment.SIZE) {
|
||||
tail = tail.push(); // Acquire a new empty segment.
|
||||
tail = tail.push(SegmentPool.INSTANCE.take()); // Append a new empty segment to fill up.
|
||||
}
|
||||
|
||||
int toCopy = Math.min(data.length - offset, Segment.SIZE - tail.limit);
|
||||
@@ -117,7 +122,95 @@ public final class OkBuffer implements Source, Sink {
|
||||
}
|
||||
|
||||
@Override public void write(OkBuffer source, long byteCount, Timeout timeout) {
|
||||
throw new UnsupportedOperationException();
|
||||
// Move bytes from the head of the source buffer to the tail of this buffer
|
||||
// while balancing two conflicting goals: don't waste CPU and don't waste
|
||||
// memory.
|
||||
//
|
||||
//
|
||||
// Don't waste CPU (ie. don't copy data around).
|
||||
//
|
||||
// Copying large amounts of data is expensive. Instead, we prefer to
|
||||
// reassign entire segments from one OkBuffer to the other.
|
||||
//
|
||||
//
|
||||
// Don't waste memory.
|
||||
//
|
||||
// As an invariant, adjacent pairs of segments in an OkBuffer should be at
|
||||
// least 50% full, except for the head segment and the tail segment.
|
||||
//
|
||||
// The head segment cannot maintain the invariant because the application is
|
||||
// consuming bytes from this segment, decreasing its level.
|
||||
//
|
||||
// The tail segment cannot maintain the invariant because the application is
|
||||
// producing bytes, which may require new nearly-empty tail segments to be
|
||||
// appended.
|
||||
//
|
||||
//
|
||||
// Moving segments between buffers
|
||||
//
|
||||
// When writing one buffer to another, we prefer to reassign entire segments
|
||||
// over copying bytes into their most compact form. Suppose we have a buffer
|
||||
// with these segment levels [91%, 61%]. If we append a buffer with a
|
||||
// single [72%] segment, that yields [91%, 61%, 72%]. No bytes are copied.
|
||||
//
|
||||
// Or suppose we have a buffer with these segment levels: [100%, 2%], and we
|
||||
// want to append it to a buffer with these segment levels [99%, 3%]. This
|
||||
// operation will yield the following segments: [100%, 2%, 99%, 3%]. That
|
||||
// is, we do not spend time copying bytes around to achieve more efficient
|
||||
// memory use like [100%, 100%, 4%].
|
||||
//
|
||||
// When combining buffers, we will compact adjacent buffers when their
|
||||
// combined level is less than 100%. For example, when we start with [100%,
|
||||
// 40%] and append [30%, 80%], the result is [100%, 70%, 80%].
|
||||
//
|
||||
//
|
||||
// Splitting segments
|
||||
//
|
||||
// Occasionally we write only part of a source buffer to a sink buffer. For
|
||||
// example, given a sink [51%, 91%], we may want to write the first 30% of
|
||||
// a source [92%, 82%] to it. To simplify, we first transform the source to
|
||||
// an equivalent buffer [30%, 62%, 82%] and then move the head segment,
|
||||
// yielding sink [51%, 91%, 30%] and source [62%, 82%].
|
||||
|
||||
if (source == this) throw new IllegalArgumentException("source == this");
|
||||
if (byteCount > source.byteCount) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("requested %s > available %s", byteCount, this.byteCount));
|
||||
}
|
||||
|
||||
while (byteCount > 0) {
|
||||
// Is a prefix of the source's head segment all that we need to move?
|
||||
if (byteCount < (source.head.limit - source.head.pos)) {
|
||||
Segment tail = head.prev;
|
||||
if (head == null || byteCount + (tail.limit - tail.pos) > Segment.SIZE) {
|
||||
// We're going to need another segment. Split the source's head
|
||||
// segment in two, then move the first of those two to this buffer.
|
||||
source.head = source.head.split((int) byteCount);
|
||||
} else {
|
||||
// Our existing segments are sufficient. Move bytes from source's head to our tail.
|
||||
source.head.writeTo(tail, (int) byteCount);
|
||||
source.byteCount -= byteCount;
|
||||
this.byteCount += byteCount;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the source's head segment and append it to our tail.
|
||||
Segment segmentToMove = source.head;
|
||||
long movedByteCount = segmentToMove.limit - segmentToMove.pos;
|
||||
source.head = segmentToMove.pop();
|
||||
if (head == null) {
|
||||
head = segmentToMove;
|
||||
head.next = head.prev = head;
|
||||
} else {
|
||||
Segment tail = head.prev;
|
||||
tail = tail.push(segmentToMove);
|
||||
tail.compact();
|
||||
}
|
||||
source.byteCount -= movedByteCount;
|
||||
this.byteCount += movedByteCount;
|
||||
byteCount -= movedByteCount;
|
||||
}
|
||||
}
|
||||
|
||||
@Override public long read(OkBuffer sink, long byteCount, Timeout timeout) throws IOException {
|
||||
@@ -129,11 +222,22 @@ public final class OkBuffer implements Source, Sink {
|
||||
}
|
||||
|
||||
@Override public void flush(Timeout timeout) {
|
||||
throw new UnsupportedOperationException();
|
||||
throw new UnsupportedOperationException("Cannot flush() an OkBuffer");
|
||||
}
|
||||
|
||||
@Override public void close(Timeout timeout) {
|
||||
throw new UnsupportedOperationException();
|
||||
throw new UnsupportedOperationException("Cannot close() an OkBuffer");
|
||||
}
|
||||
|
||||
/** For testing. This returns the sizes of the segments in this buffer. */
|
||||
List<Integer> segmentSizes() {
|
||||
if (head == null) return Collections.emptyList();
|
||||
List<Integer> result = new ArrayList<Integer>();
|
||||
result.add(head.limit - head.pos);
|
||||
for (Segment s = head.next; s != head; s = s.next) {
|
||||
result.add(s.limit - s.pos);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -144,7 +248,7 @@ public final class OkBuffer implements Source, Sink {
|
||||
if (byteCount > 0x100000) return super.toString();
|
||||
char[] result = new char[(int) (byteCount * 2)];
|
||||
int offset = 0;
|
||||
for (Segment s = segment; offset < byteCount; s = s.next) {
|
||||
for (Segment s = head; offset < byteCount; s = s.next) {
|
||||
for (int i = s.pos; i < s.limit; i++) {
|
||||
result[offset++] = HEX_DIGITS[(s.data[i] >> 4) & 0xf];
|
||||
result[offset++] = HEX_DIGITS[s.data[i] & 0xf];
|
||||
|
||||
@@ -31,23 +31,23 @@ final class Segment {
|
||||
// TODO: Is 2 KiB a good default segment size?
|
||||
static final int SIZE = 2048;
|
||||
|
||||
final byte[] data;
|
||||
final byte[] data = new byte[SIZE];
|
||||
|
||||
/** The next byte of application data byte to read in this segment. */
|
||||
int pos;
|
||||
|
||||
/** The first byte of available data ready to be written to. */
|
||||
int limit;
|
||||
|
||||
/** Next segment in a linked list. */
|
||||
/** Next segment in a linked or circularly-linked list. */
|
||||
Segment next;
|
||||
|
||||
/** Previous segment in a linked list. */
|
||||
/** Previous segment in a circularly-linked list. */
|
||||
Segment prev;
|
||||
|
||||
Segment() {
|
||||
data = new byte[SIZE];
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes this head of a circularly-linked list, recycles it, and returns the
|
||||
* new head of the list. Returns null if the list is now empty.
|
||||
* Removes this segment of a circularly-linked list and returns its successor.
|
||||
* Returns null if the list is now empty.
|
||||
*/
|
||||
public Segment pop() {
|
||||
Segment result = next != this ? next : null;
|
||||
@@ -55,20 +55,81 @@ final class Segment {
|
||||
next.prev = prev;
|
||||
next = null;
|
||||
prev = null;
|
||||
SegmentPool.INSTANCE.recycle(this);
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquires a segment and appends it to this tail of a circularly-linked list.
|
||||
* Returns the new tail segment.
|
||||
* Appends {@code segment} after this segment in the circularly-linked list.
|
||||
* Returns the pushed segment.
|
||||
*/
|
||||
public Segment push() {
|
||||
Segment result = SegmentPool.INSTANCE.take();
|
||||
result.prev = this;
|
||||
result.next = next;
|
||||
next.prev = result;
|
||||
next = result;
|
||||
return result;
|
||||
public Segment push(Segment segment) {
|
||||
segment.prev = this;
|
||||
segment.next = next;
|
||||
next.prev = segment;
|
||||
next = segment;
|
||||
return segment;
|
||||
}
|
||||
|
||||
/**
|
||||
* Splits this head of a circularly-linked list into two segments. The first
|
||||
* segment contains the data in {@code [pos..pos+byteCount)}. The second
|
||||
* segment contains the data in {@code [pos+byteCount..limit)}. This can be
|
||||
* useful when moving partial segments from one OkBuffer to another.
|
||||
*
|
||||
* <p>Returns the new head of the circularly-linked list.
|
||||
*/
|
||||
public Segment split(int byteCount) {
|
||||
int aSize = byteCount;
|
||||
int bSize = (limit - pos) - byteCount;
|
||||
if (aSize <= 0 || bSize <= 0) throw new IllegalArgumentException();
|
||||
|
||||
// Which side of the split is larger? We want to copy as few bytes as possible.
|
||||
if (aSize < bSize) {
|
||||
// Create a segment of size 'aSize' before this segment.
|
||||
Segment before = SegmentPool.INSTANCE.take();
|
||||
System.arraycopy(data, pos, before.data, before.pos, aSize);
|
||||
pos += aSize;
|
||||
before.limit += aSize;
|
||||
prev.push(before);
|
||||
return before;
|
||||
} else {
|
||||
// Create a new segment of size 'bSize' after this segment.
|
||||
Segment after = SegmentPool.INSTANCE.take();
|
||||
System.arraycopy(data, pos + aSize, after.data, after.pos, bSize);
|
||||
limit -= bSize;
|
||||
after.limit += bSize;
|
||||
push(after);
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Call this when the tail and its predecessor may both be less than half
|
||||
* full. This will copy data so that segments can be recycled.
|
||||
*/
|
||||
public void compact() {
|
||||
if (prev == this) throw new IllegalStateException();
|
||||
if ((prev.limit - prev.pos) + (limit - pos) > SIZE) return; // Cannot compact.
|
||||
writeTo(prev, limit - pos);
|
||||
pop();
|
||||
SegmentPool.INSTANCE.recycle(this);
|
||||
}
|
||||
|
||||
/** Moves {@code byteCount} bytes from {@code sink} to this segment. */
|
||||
// TODO: if sink has fewer bytes than this, it may be cheaper to reverse the
|
||||
// direction of the copy and swap the segments!
|
||||
public void writeTo(Segment sink, int byteCount) {
|
||||
if (byteCount + (sink.limit - sink.pos) > SIZE) throw new IllegalArgumentException();
|
||||
|
||||
if (sink.limit + byteCount > SIZE) {
|
||||
// We can't fit byteCount bytes at the sink's current position. Compact sink first.
|
||||
System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
|
||||
sink.limit -= sink.pos;
|
||||
sink.pos = 0;
|
||||
}
|
||||
|
||||
System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
|
||||
sink.limit += byteCount;
|
||||
pos += byteCount;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,8 +16,10 @@
|
||||
package com.squareup.okhttp.internal.bytes;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import org.junit.Test;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
@@ -48,8 +50,8 @@ public final class OkBufferTest {
|
||||
@Test public void multipleSegmentBuffers() throws Exception {
|
||||
OkBuffer buffer = new OkBuffer();
|
||||
buffer.writeUtf8(repeat('a', 1000));
|
||||
buffer.writeUtf8(repeat('b', 2500));
|
||||
buffer.writeUtf8(repeat('c', 5000));
|
||||
buffer.writeUtf8(repeat('b', 2500));
|
||||
buffer.writeUtf8(repeat('c', 5000));
|
||||
buffer.writeUtf8(repeat('d', 10000));
|
||||
buffer.writeUtf8(repeat('e', 25000));
|
||||
buffer.writeUtf8(repeat('f', 50000));
|
||||
@@ -88,6 +90,98 @@ public final class OkBufferTest {
|
||||
assertEquals(0, SegmentPool.INSTANCE.byteCount);
|
||||
}
|
||||
|
||||
@Test public void moveBytesBetweenBuffersShareSegment() throws Exception {
|
||||
int size = (Segment.SIZE / 2) - 1;
|
||||
List<Integer> segmentSizes = moveBytesBetweenBuffers(repeat('a', size), repeat('b', size));
|
||||
assertEquals(asList(size * 2), segmentSizes);
|
||||
}
|
||||
|
||||
@Test public void moveBytesBetweenBuffersReassignSegment() throws Exception {
|
||||
int size = (Segment.SIZE / 2) + 1;
|
||||
List<Integer> segmentSizes = moveBytesBetweenBuffers(repeat('a', size), repeat('b', size));
|
||||
assertEquals(asList(size, size), segmentSizes);
|
||||
}
|
||||
|
||||
@Test public void moveBytesBetweenBuffersMultipleSegments() throws Exception {
|
||||
int size = 3 * Segment.SIZE + 1;
|
||||
List<Integer> segmentSizes = moveBytesBetweenBuffers(repeat('a', size), repeat('b', size));
|
||||
assertEquals(asList(Segment.SIZE, Segment.SIZE, Segment.SIZE, 1,
|
||||
Segment.SIZE, Segment.SIZE, Segment.SIZE, 1), segmentSizes);
|
||||
}
|
||||
|
||||
private List<Integer> moveBytesBetweenBuffers(String... contents) {
|
||||
StringBuilder expected = new StringBuilder();
|
||||
OkBuffer buffer = new OkBuffer();
|
||||
for (String s : contents) {
|
||||
OkBuffer source = new OkBuffer();
|
||||
source.writeUtf8(s);
|
||||
buffer.write(source, source.byteCount(), Timeout.NONE);
|
||||
expected.append(s);
|
||||
}
|
||||
List<Integer> segmentSizes = buffer.segmentSizes();
|
||||
assertEquals(expected.toString(), buffer.readUtf8(expected.length()));
|
||||
return segmentSizes;
|
||||
}
|
||||
|
||||
/** The big part of source's first segment is being moved. */
|
||||
@Test public void writeSplitSourceBufferLeft() throws Exception {
|
||||
int writeSize = Segment.SIZE / 2 + 1;
|
||||
|
||||
OkBuffer sink = new OkBuffer();
|
||||
sink.writeUtf8(repeat('b', Segment.SIZE - 10));
|
||||
|
||||
OkBuffer source = new OkBuffer();
|
||||
source.writeUtf8(repeat('a', Segment.SIZE * 2));
|
||||
sink.write(source, writeSize, Timeout.NONE);
|
||||
|
||||
assertEquals(asList(Segment.SIZE - 10, writeSize), sink.segmentSizes());
|
||||
assertEquals(asList(Segment.SIZE - writeSize, Segment.SIZE), source.segmentSizes());
|
||||
}
|
||||
|
||||
/** The big part of source's first segment is staying put. */
|
||||
@Test public void writeSplitSourceBufferRight() throws Exception {
|
||||
int writeSize = Segment.SIZE / 2 - 1;
|
||||
|
||||
OkBuffer sink = new OkBuffer();
|
||||
sink.writeUtf8(repeat('b', Segment.SIZE - 10));
|
||||
|
||||
OkBuffer source = new OkBuffer();
|
||||
source.writeUtf8(repeat('a', Segment.SIZE * 2));
|
||||
sink.write(source, writeSize, Timeout.NONE);
|
||||
|
||||
assertEquals(asList(Segment.SIZE - 10, writeSize), sink.segmentSizes());
|
||||
assertEquals(asList(Segment.SIZE - writeSize, Segment.SIZE), source.segmentSizes());
|
||||
}
|
||||
|
||||
@Test public void writePrefixDoesntSplit() throws Exception {
|
||||
OkBuffer sink = new OkBuffer();
|
||||
sink.writeUtf8(repeat('b', 10));
|
||||
|
||||
OkBuffer source = new OkBuffer();
|
||||
source.writeUtf8(repeat('a', Segment.SIZE * 2));
|
||||
sink.write(source, 20, Timeout.NONE);
|
||||
|
||||
assertEquals(asList(30), sink.segmentSizes());
|
||||
assertEquals(asList(Segment.SIZE - 20, Segment.SIZE), source.segmentSizes());
|
||||
assertEquals(30, sink.byteCount());
|
||||
assertEquals(Segment.SIZE * 2 - 20, source.byteCount());
|
||||
}
|
||||
|
||||
@Test public void writePrefixDoesntSplitButRequiresCompact() throws Exception {
|
||||
OkBuffer sink = new OkBuffer();
|
||||
sink.writeUtf8(repeat('b', Segment.SIZE - 10)); // limit = size - 10
|
||||
sink.readUtf8(Segment.SIZE - 20); // pos = size = 20
|
||||
|
||||
OkBuffer source = new OkBuffer();
|
||||
source.writeUtf8(repeat('a', Segment.SIZE * 2));
|
||||
sink.write(source, 20, Timeout.NONE);
|
||||
|
||||
assertEquals(asList(30), sink.segmentSizes());
|
||||
assertEquals(asList(Segment.SIZE - 20, Segment.SIZE), source.segmentSizes());
|
||||
assertEquals(30, sink.byteCount());
|
||||
assertEquals(Segment.SIZE * 2 - 20, source.byteCount());
|
||||
}
|
||||
|
||||
private String repeat(char c, int count) {
|
||||
char[] array = new char[count];
|
||||
Arrays.fill(array, c);
|
||||
|
||||
Reference in New Issue
Block a user