diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/Util.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/Util.java index 2b4638fd8..999b0fe61 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/Util.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/Util.java @@ -18,6 +18,7 @@ package com.squareup.okhttp.internal; import com.squareup.okhttp.internal.bytes.Deadline; import com.squareup.okhttp.internal.bytes.OkBuffer; +import com.squareup.okhttp.internal.bytes.Sink; import com.squareup.okhttp.internal.bytes.Source; import com.squareup.okhttp.internal.spdy.Header; import java.io.ByteArrayInputStream; @@ -122,6 +123,21 @@ public final class Util { } } + /** + * Closes {@code sink}, ignoring any checked exceptions. Does nothing if + * {@code sink} is null. + */ + public static void closeQuietly(Sink sink) { + if (sink != null) { + try { + sink.close(Deadline.NONE); + } catch (RuntimeException rethrown) { + throw rethrown; + } catch (Exception ignored) { + } + } + } + /** * Closes {@code socket}, ignoring any checked exceptions. Does nothing if * {@code socket} is null. diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/BufferedSink.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/BufferedSink.java index cbd271650..3d5f00e4b 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/BufferedSink.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/BufferedSink.java @@ -55,6 +55,12 @@ public final class BufferedSink implements Sink { emitCompleteSegments(deadline); } + public void write(byte[] data, int offset, int byteCount, Deadline deadline) throws IOException { + if (closed) throw new IllegalStateException("closed"); + buffer.write(data, offset, byteCount); + emitCompleteSegments(deadline); + } + public void writeByte(int b, Deadline deadline) throws IOException { if (closed) throw new IllegalStateException("closed"); buffer.writeByte(b); @@ -74,16 +80,8 @@ public final class BufferedSink implements Sink { } void emitCompleteSegments(Deadline deadline) throws IOException { - long byteCount = buffer.byteCount; + long byteCount = buffer.completeSegmentByteCount(); if (byteCount == 0) return; - - // Omit the tail if it's still writable. - Segment tail = buffer.head.prev; - if (tail.limit < Segment.SIZE) { - byteCount -= tail.limit - tail.pos; - if (byteCount == 0) return; - } - sink.write(buffer, byteCount, deadline); } @@ -121,6 +119,7 @@ public final class BufferedSink implements Sink { if (buffer.byteCount > 0) { sink.write(buffer, buffer.byteCount, deadline); } + sink.flush(deadline); } @Override public void close(Deadline deadline) throws IOException { diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/OkBuffer.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/OkBuffer.java index 835290849..9ed0dfa7c 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/OkBuffer.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/OkBuffer.java @@ -39,7 +39,7 @@ import static com.squareup.okhttp.internal.Util.checkOffsetAndCount; * returning it to you. Even if you're going to write over that space anyway. * This class avoids zero-fill and GC churn by pooling byte arrays. */ -public final class OkBuffer implements Source, Sink { +public final class OkBuffer implements Source, Sink, Cloneable { private static final char[] HEX_DIGITS = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' }; @@ -54,6 +54,24 @@ public final class OkBuffer implements Source, Sink { return byteCount; } + /** + * Returns the number of bytes in segments that are not writable. This is the + * number of bytes that can be flushed immediately to an underlying sink + * without harming throughput. + */ + public long completeSegmentByteCount() { + long result = byteCount; + if (result == 0) return 0; + + // Omit the tail if it's still writable. + Segment tail = head.prev; + if (tail.limit < Segment.SIZE) { + result -= tail.limit - tail.pos; + } + + return result; + } + /** Removes a byte from the front of this buffer and returns it. */ public byte readByte() { if (byteCount < 1) throw new IllegalArgumentException("byteCount < 1: " + byteCount); @@ -481,4 +499,17 @@ public final class OkBuffer implements Source, Sink { } return new String(result); } + + /** Returns a deep copy of this buffer. */ + @Override public OkBuffer clone() { + OkBuffer result = new OkBuffer(); + if (byteCount() == 0) return result; + + result.write(head.data, head.pos, head.limit - head.pos); + for (Segment s = head.next; s != head; s = s.next) { + result.write(s.data, s.pos, s.limit - s.pos); + } + + return result; + } } diff --git a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/bytes/BufferedSinkTest.java b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/bytes/BufferedSinkTest.java new file mode 100644 index 000000000..f5508b347 --- /dev/null +++ b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/bytes/BufferedSinkTest.java @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2014 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.squareup.okhttp.internal.bytes; + +import java.util.Arrays; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public final class BufferedSinkTest { + @Test public void bytesEmittedToSinkWithFlush() throws Exception { + OkBuffer sink = new OkBuffer(); + BufferedSink bufferedSink = new BufferedSink(sink); + bufferedSink.writeUtf8("abc", Deadline.NONE); + bufferedSink.flush(Deadline.NONE); + assertEquals(3, sink.byteCount()); + } + + @Test public void bytesNotEmittedToSinkWithoutFlush() throws Exception { + OkBuffer sink = new OkBuffer(); + BufferedSink bufferedSink = new BufferedSink(sink); + bufferedSink.writeUtf8("abc", Deadline.NONE); + assertEquals(0, sink.byteCount()); + } + + @Test public void completeSegmentsEmitted() throws Exception { + OkBuffer sink = new OkBuffer(); + BufferedSink bufferedSink = new BufferedSink(sink); + bufferedSink.writeUtf8(repeat('a', Segment.SIZE * 3), Deadline.NONE); + assertEquals(Segment.SIZE * 3, sink.byteCount()); + } + + @Test public void incompleteSegmentsNotEmitted() throws Exception { + OkBuffer sink = new OkBuffer(); + BufferedSink bufferedSink = new BufferedSink(sink); + bufferedSink.writeUtf8(repeat('a', Segment.SIZE * 3 - 1), Deadline.NONE); + assertEquals(Segment.SIZE * 2, sink.byteCount()); + } + + private String repeat(char c, int count) { + char[] array = new char[count]; + Arrays.fill(array, c); + return new String(array); + } +} diff --git a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/bytes/OkBufferTest.java b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/bytes/OkBufferTest.java index d05943e13..97703b95c 100644 --- a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/bytes/OkBufferTest.java +++ b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/bytes/OkBufferTest.java @@ -48,6 +48,23 @@ public final class OkBufferTest { } } + @Test public void completeSegmentByteCountOnEmptyBuffer() throws Exception { + OkBuffer buffer = new OkBuffer(); + assertEquals(0, buffer.completeSegmentByteCount()); + } + + @Test public void completeSegmentByteCountOnBufferWithFullSegments() throws Exception { + OkBuffer buffer = new OkBuffer(); + buffer.writeUtf8(repeat('a', Segment.SIZE * 4)); + assertEquals(Segment.SIZE * 4, buffer.completeSegmentByteCount()); + } + + @Test public void completeSegmentByteCountOnBufferWithIncompleteTailSegment() throws Exception { + OkBuffer buffer = new OkBuffer(); + buffer.writeUtf8(repeat('a', Segment.SIZE * 4 - 10)); + assertEquals(Segment.SIZE * 3, buffer.completeSegmentByteCount()); + } + @Test public void readUtf8SpansSegments() throws Exception { OkBuffer buffer = new OkBuffer(); buffer.writeUtf8(repeat('a', Segment.SIZE * 2)); @@ -684,6 +701,51 @@ public final class OkBufferTest { assertEquals(2, source.byteCount()); } + @Test public void cloneDoesNotObserveWritesToOriginal() throws Exception { + OkBuffer original = new OkBuffer(); + OkBuffer clone = original.clone(); + original.writeUtf8("abc"); + assertEquals(0, clone.byteCount()); + } + + @Test public void cloneDoesNotObserveReadsFromOriginal() throws Exception { + OkBuffer original = new OkBuffer(); + original.writeUtf8("abc"); + OkBuffer clone = original.clone(); + assertEquals("abc", original.readUtf8(3)); + assertEquals(3, clone.byteCount()); + assertEquals("ab", clone.readUtf8(2)); + } + + @Test public void originalDoesNotObserveWritesToClone() throws Exception { + OkBuffer original = new OkBuffer(); + OkBuffer clone = original.clone(); + clone.writeUtf8("abc"); + assertEquals(0, original.byteCount()); + } + + @Test public void originalDoesNotObserveReadsFromClone() throws Exception { + OkBuffer original = new OkBuffer(); + original.writeUtf8("abc"); + OkBuffer clone = original.clone(); + assertEquals("abc", clone.readUtf8(3)); + assertEquals(3, original.byteCount()); + assertEquals("ab", original.readUtf8(2)); + } + + @Test public void cloneMultipleSegments() throws Exception { + OkBuffer original = new OkBuffer(); + original.writeUtf8(repeat('a', Segment.SIZE * 3)); + OkBuffer clone = original.clone(); + original.writeUtf8(repeat('b', Segment.SIZE * 3)); + clone.writeUtf8(repeat('c', Segment.SIZE * 3)); + + assertEquals(repeat('a', Segment.SIZE * 3) + repeat('b', Segment.SIZE * 3), + original.readUtf8(Segment.SIZE * 6)); + assertEquals(repeat('a', Segment.SIZE * 3) + repeat('c', Segment.SIZE * 3), + clone.readUtf8(Segment.SIZE * 6)); + } + private String repeat(char c, int count) { char[] array = new char[count]; Arrays.fill(array, c);