1
0
mirror of https://github.com/square/okhttp.git synced 2026-01-22 15:42:00 +03:00

Use OkBuffer in SpdyStream.

This commit is contained in:
jwilson
2014-02-12 12:35:18 -05:00
parent c40cb632ff
commit c8507d05b4
13 changed files with 164 additions and 235 deletions

View File

@@ -54,8 +54,8 @@ public final class BufferedSource implements Source {
* will block until there are bytes to read or the source is definitely
* exhausted.
*/
public boolean exhausted() throws IOException {
return buffer.byteCount() == 0 && source.read(buffer, Segment.SIZE, Deadline.NONE) == -1;
public boolean exhausted(Deadline deadline) throws IOException {
return buffer.byteCount() == 0 && source.read(buffer, Segment.SIZE, deadline) == -1;
}
/**
@@ -84,11 +84,21 @@ public final class BufferedSource implements Source {
return buffer.readShort();
}
public int readShortLe() throws IOException {
require(2, Deadline.NONE);
return buffer.readShortLe();
}
public int readInt() throws IOException {
require(4, Deadline.NONE);
return buffer.readInt();
}
public int readIntLe() throws IOException {
require(4, Deadline.NONE);
return buffer.readIntLe();
}
/**
* Reads and discards {@code byteCount} bytes from {@code source} using {@code
* buffer} as a buffer. Throws an {@link EOFException} if the source is
@@ -105,6 +115,20 @@ public final class BufferedSource implements Source {
}
}
/**
* Returns the index of {@code b} in the buffer, refilling it if necessary
* until it is found. This reads an unbounded number of bytes into the buffer.
*/
public long seek(byte b, Deadline deadline) throws IOException {
long start = 0;
long index;
while ((index = buffer.indexOf(b, start)) == -1) {
start = buffer.byteCount;
if (source.read(buffer, Segment.SIZE, deadline) == -1) throw new EOFException();
}
return index;
}
/** Returns an input stream that reads from this source. */
public InputStream inputStream() {
return new InputStream() {

View File

@@ -15,7 +15,6 @@
*/
package com.squareup.okhttp.internal.bytes;
import java.io.EOFException;
import java.io.IOException;
import java.util.zip.CRC32;
import java.util.zip.Inflater;
@@ -34,22 +33,12 @@ public final class GzipSource implements Source {
/** The current section. Always progresses forward. */
private int section = SECTION_HEADER;
/**
* This buffer is carefully shared between this source and the InflaterSource
* it wraps. In particular, this source may read more bytes than necessary for
* the GZIP header; the InflaterSource will pick those up when it starts to
* read the compressed body. And the InflaterSource may read more bytes than
* necessary for the compressed body, and this source will pick those up for
* the GZIP trailer.
*/
private final OkBuffer buffer = new OkBuffer();
/**
* Our source should yield a GZIP header (which we consume directly), followed
* by deflated bytes (which we consume via an InflaterSource), followed by a
* GZIP trailer (which we also consume directly).
*/
private final Source source;
private final BufferedSource source;
/** The inflater used to decompress the deflated body. */
private final Inflater inflater;
@@ -65,8 +54,8 @@ public final class GzipSource implements Source {
public GzipSource(Source source) throws IOException {
this.inflater = new Inflater(true);
this.source = source;
this.inflaterSource = new InflaterSource(source, inflater, buffer);
this.source = new BufferedSource(source, new OkBuffer());
this.inflaterSource = new InflaterSource(this.source, inflater);
}
@Override public long read(OkBuffer sink, long byteCount, Deadline deadline) throws IOException {
@@ -108,26 +97,26 @@ public final class GzipSource implements Source {
// +---+---+---+---+---+---+---+---+---+---+
// |ID1|ID2|CM |FLG| MTIME |XFL|OS | (more-->)
// +---+---+---+---+---+---+---+---+---+---+
require(10, deadline);
byte flags = buffer.getByte(3);
source.require(10, deadline);
byte flags = source.buffer.getByte(3);
boolean fhcrc = ((flags >> FHCRC) & 1) == 1;
if (fhcrc) updateCrc(buffer, 0, 10);
if (fhcrc) updateCrc(source.buffer, 0, 10);
short id1id2 = buffer.readShort();
short id1id2 = source.readShort();
checkEqual("ID1ID2", (short) 0x1f8b, id1id2);
buffer.skip(8);
source.skip(8, deadline);
// Skip optional extra fields.
// +---+---+=================================+
// | XLEN |...XLEN bytes of "extra field"...| (more-->)
// +---+---+=================================+
if (((flags >> FEXTRA) & 1) == 1) {
require(2, deadline);
if (fhcrc) updateCrc(buffer, 0, 2);
int xlen = buffer.readShortLe() & 0xffff;
require(xlen, deadline);
if (fhcrc) updateCrc(buffer, 0, xlen);
buffer.skip(xlen);
source.require(2, deadline);
if (fhcrc) updateCrc(source.buffer, 0, 2);
int xlen = source.buffer.readShortLe() & 0xffff;
source.require(xlen, deadline);
if (fhcrc) updateCrc(source.buffer, 0, xlen);
source.skip(xlen, deadline);
}
// Skip an optional 0-terminated name.
@@ -135,9 +124,9 @@ public final class GzipSource implements Source {
// |...original file name, zero-terminated...| (more-->)
// +=========================================+
if (((flags >> FNAME) & 1) == 1) {
long index = OkBuffers.seek(buffer, (byte) 0, source, deadline);
if (fhcrc) updateCrc(buffer, 0, index + 1);
buffer.skip(index + 1);
long index = source.seek((byte) 0, deadline);
if (fhcrc) updateCrc(source.buffer, 0, index + 1);
source.buffer.skip(index + 1);
}
// Skip an optional 0-terminated comment.
@@ -145,9 +134,9 @@ public final class GzipSource implements Source {
// |...file comment, zero-terminated...| (more-->)
// +===================================+
if (((flags >> FCOMMENT) & 1) == 1) {
long index = OkBuffers.seek(buffer, (byte) 0, source, deadline);
if (fhcrc) updateCrc(buffer, 0, index + 1);
buffer.skip(index + 1);
long index = source.seek((byte) 0, deadline);
if (fhcrc) updateCrc(source.buffer, 0, index + 1);
source.skip(index + 1, deadline);
}
// Confirm the optional header CRC.
@@ -155,7 +144,7 @@ public final class GzipSource implements Source {
// | CRC16 |
// +---+---+
if (fhcrc) {
checkEqual("FHCRC", buffer.readShortLe(), (short) crc.getValue());
checkEqual("FHCRC", source.readShortLe(), (short) crc.getValue());
crc.reset();
}
}
@@ -165,9 +154,8 @@ public final class GzipSource implements Source {
// +---+---+---+---+---+---+---+---+
// | CRC32 | ISIZE |
// +---+---+---+---+---+---+---+---+
require(8, deadline);
checkEqual("CRC", buffer.readIntLe(), (int) crc.getValue());
checkEqual("ISIZE", buffer.readIntLe(), inflater.getTotalOut());
checkEqual("CRC", source.readIntLe(), (int) crc.getValue());
checkEqual("ISIZE", source.readIntLe(), inflater.getTotalOut());
}
@Override public void close(Deadline deadline) throws IOException {
@@ -187,13 +175,6 @@ public final class GzipSource implements Source {
}
}
/** Fills the buffer with at least {@code byteCount} bytes. */
private void require(int byteCount, Deadline deadline) throws IOException {
while (buffer.byteCount < byteCount) {
if (source.read(buffer, Segment.SIZE, deadline) == -1) throw new EOFException();
}
}
private void checkEqual(String name, int expected, int actual) throws IOException {
if (actual != expected) {
throw new IOException(String.format(

View File

@@ -22,10 +22,8 @@ import java.util.zip.Inflater;
/** A source that inflates another source. */
public final class InflaterSource implements Source {
private final Source source;
private final BufferedSource source;
private final Inflater inflater;
/** This holds bytes read from the source, but not yet inflated. */
private final OkBuffer buffer;
/**
* When we call Inflater.setInput(), the inflater keeps our byte array until
@@ -36,15 +34,19 @@ public final class InflaterSource implements Source {
private boolean closed;
public InflaterSource(Source source, Inflater inflater) {
this(source, inflater, new OkBuffer());
this(new BufferedSource(source, new OkBuffer()), inflater);
}
InflaterSource(Source source, Inflater inflater, OkBuffer buffer) {
/**
* This package-private constructor shares a buffer with its trusted caller.
* In general we can't share a BufferedSource because the inflater holds input
* bytes until they are inflated.
*/
InflaterSource(BufferedSource source, Inflater inflater) {
if (source == null) throw new IllegalArgumentException("source == null");
if (inflater == null) throw new IllegalArgumentException("inflater == null");
this.source = source;
this.inflater = inflater;
this.buffer = buffer;
}
@Override public long read(
@@ -87,13 +89,11 @@ public final class InflaterSource implements Source {
releaseInflatedBytes();
if (inflater.getRemaining() != 0) throw new IllegalStateException("?"); // TODO: possible?
// Refill the buffer with compressed data from the source.
if (buffer.byteCount == 0) {
if (source.read(buffer, Segment.SIZE, deadline) == -1) return true;
}
// If there are compressed bytes in the source, assign them to the inflater.
if (source.exhausted(deadline)) return true;
// Assign buffer bytes to the inflater.
Segment head = buffer.head;
Segment head = source.buffer.head;
bufferBytesHeldByInflater = head.limit - head.pos;
inflater.setInput(head.data, head.pos, bufferBytesHeldByInflater);
return false;
@@ -104,13 +104,12 @@ public final class InflaterSource implements Source {
if (bufferBytesHeldByInflater == 0) return;
int toRelease = bufferBytesHeldByInflater - inflater.getRemaining();
bufferBytesHeldByInflater -= toRelease;
buffer.skip(toRelease);
source.buffer.skip(toRelease);
}
@Override public void close(Deadline deadline) throws IOException {
if (closed) return;
inflater.end();
buffer.clear();
closed = true;
source.close(deadline);
}

View File

@@ -15,7 +15,6 @@
*/
package com.squareup.okhttp.internal.bytes;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -26,22 +25,6 @@ public final class OkBuffers {
private OkBuffers() {
}
/**
* Returns the index of {@code b} in {@code buffer}, refilling it if necessary
* until it is found. This reads an unbounded number of bytes into {@code
* buffer}.
*/
public static long seek(OkBuffer buffer, byte b, Source source, Deadline deadline)
throws IOException {
long start = 0;
long index;
while ((index = buffer.indexOf(b, start)) == -1) {
start = buffer.byteCount;
if (source.read(buffer, Segment.SIZE, deadline) == -1) throw new EOFException();
}
return index;
}
/** Returns a sink that writes to {@code out}. */
public static Sink sink(final OutputStream out) {
return new Sink() {

View File

@@ -16,10 +16,10 @@
package com.squareup.okhttp.internal.spdy;
import com.squareup.okhttp.internal.bytes.BufferedSource;
import com.squareup.okhttp.internal.bytes.ByteString;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
/** Reads transport frames for SPDY/3 or HTTP/2. */
@@ -28,7 +28,8 @@ public interface FrameReader extends Closeable {
boolean nextFrame(Handler handler) throws IOException;
public interface Handler {
void data(boolean inFinished, int streamId, InputStream in, int length) throws IOException;
void data(boolean inFinished, int streamId, BufferedSource source, int length)
throws IOException;
/**
* Create or update incoming headers, creating the corresponding streams

View File

@@ -3,6 +3,7 @@ package com.squareup.okhttp.internal.spdy;
import com.squareup.okhttp.internal.BitArray;
import com.squareup.okhttp.internal.bytes.BufferedSource;
import com.squareup.okhttp.internal.bytes.ByteString;
import com.squareup.okhttp.internal.bytes.Deadline;
import com.squareup.okhttp.internal.bytes.OkBuffer;
import com.squareup.okhttp.internal.bytes.Source;
import java.io.IOException;
@@ -180,7 +181,7 @@ final class HpackDraft05 {
* set of emitted headers.
*/
void readHeaders() throws IOException {
while (!source.exhausted()) {
while (!source.exhausted(Deadline.NONE)) {
int b = source.readByte() & 0xff;
if (b == 0x80) { // 10000000
clearReferenceSet();

View File

@@ -187,7 +187,7 @@ public final class Http20Draft09 implements Variant {
throws IOException {
boolean inFinished = (flags & FLAG_END_STREAM) != 0;
// TODO: checkState open or half-closed (local) or raise STREAM_CLOSED
handler.data(inFinished, streamId, source.inputStream(), length);
handler.data(inFinished, streamId, source, length);
}
private void readPriority(Handler handler, short length, byte flags, int streamId)

View File

@@ -189,7 +189,7 @@ final class Spdy3 implements Variant {
} else {
int streamId = w1 & 0x7fffffff;
boolean inFinished = (flags & FLAG_FIN) != 0;
handler.data(inFinished, streamId, source.inputStream(), length);
handler.data(inFinished, streamId, source, length);
return true;
}
}

View File

@@ -18,7 +18,9 @@ package com.squareup.okhttp.internal.spdy;
import com.squareup.okhttp.Protocol;
import com.squareup.okhttp.internal.NamedRunnable;
import com.squareup.okhttp.internal.Util;
import com.squareup.okhttp.internal.bytes.BufferedSource;
import com.squareup.okhttp.internal.bytes.ByteString;
import com.squareup.okhttp.internal.bytes.Deadline;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
@@ -526,15 +528,15 @@ public final class SpdyConnection implements Closeable {
}
}
@Override public void data(boolean inFinished, int streamId, InputStream in, int length)
@Override public void data(boolean inFinished, int streamId, BufferedSource source, int length)
throws IOException {
SpdyStream dataStream = getStream(streamId);
if (dataStream == null) {
writeSynResetLater(streamId, ErrorCode.INVALID_STREAM);
Util.skipByReading(in, length);
source.skip(length, Deadline.NONE);
return;
}
dataStream.receiveData(in, length);
dataStream.receiveData(source, length);
if (inFinished) {
dataStream.receiveFin();
}

View File

@@ -17,6 +17,11 @@
package com.squareup.okhttp.internal.spdy;
import com.squareup.okhttp.internal.Util;
import com.squareup.okhttp.internal.bytes.BufferedSource;
import com.squareup.okhttp.internal.bytes.Deadline;
import com.squareup.okhttp.internal.bytes.OkBuffer;
import com.squareup.okhttp.internal.bytes.Source;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
@@ -35,9 +40,9 @@ public final class SpdyStream {
// blocking operations are performed while the lock is held.
/**
* The total number of bytes consumed by the application
* (with {@link SpdyDataInputStream#read}), but not yet acknowledged by
* sending a {@code WINDOW_UPDATE} frame on this stream.
* The total number of bytes consumed by the application (with {@link
* SpdyDataSource#read}), but not yet acknowledged by sending a {@code
* WINDOW_UPDATE} frame on this stream.
*/
// Visible for testing
long unacknowledgedBytesRead = 0;
@@ -61,7 +66,8 @@ public final class SpdyStream {
/** Headers sent in the stream reply. Null if reply is either not sent or not sent yet. */
private List<Header> responseHeaders;
private final SpdyDataInputStream in;
private final SpdyDataSource source;
private final InputStream in;
final SpdyDataOutputStream out;
/**
@@ -78,9 +84,10 @@ public final class SpdyStream {
this.id = id;
this.connection = connection;
this.bytesLeftInWriteWindow = connection.peerSettings.getInitialWindowSize();
this.in = new SpdyDataInputStream(connection.okHttpSettings.getInitialWindowSize());
this.source = new SpdyDataSource(connection.okHttpSettings.getInitialWindowSize());
this.in = new BufferedSource(source, new OkBuffer()).inputStream();
this.out = new SpdyDataOutputStream();
this.in.finished = inFinished;
this.source.finished = inFinished;
this.out.finished = outFinished;
this.priority = priority;
this.requestHeaders = requestHeaders;
@@ -100,7 +107,9 @@ public final class SpdyStream {
if (errorCode != null) {
return false;
}
if ((in.finished || in.closed) && (out.finished || out.closed) && responseHeaders != null) {
if ((source.finished || source.closed)
&& (out.finished || out.closed)
&& responseHeaders != null) {
return false;
}
return true;
@@ -251,7 +260,7 @@ public final class SpdyStream {
if (this.errorCode != null) {
return false;
}
if (in.finished && out.finished) {
if (source.finished && out.finished) {
return false;
}
this.errorCode = errorCode;
@@ -292,16 +301,16 @@ public final class SpdyStream {
}
}
void receiveData(InputStream in, int length) throws IOException {
void receiveData(BufferedSource in, int length) throws IOException {
assert (!Thread.holdsLock(SpdyStream.this));
this.in.receive(in, length);
this.source.receive(in, length);
}
void receiveFin() {
assert (!Thread.holdsLock(SpdyStream.this));
boolean open;
synchronized (this) {
this.in.finished = true;
this.source.finished = true;
open = isOpen();
notifyAll();
}
@@ -322,36 +331,19 @@ public final class SpdyStream {
}
/**
* An input stream that reads the incoming data frames of a stream. Although
* this class uses synchronization to safely receive incoming data frames,
* it is not intended for use by multiple readers.
* A source that reads the incoming data frames of a stream. Although this
* class uses synchronization to safely receive incoming data frames, it is
* not intended for use by multiple readers.
*/
private final class SpdyDataInputStream extends InputStream {
private final class SpdyDataSource implements Source {
/** Buffer to receive data from the network into. Only accessed by the reader thread. */
private final OkBuffer receiveBuffer = new OkBuffer();
// Store incoming data bytes in a circular buffer. When the buffer is
// empty, pos == -1. Otherwise pos is the first byte to read and limit
// is the first byte to write.
//
// { - - - X X X X - - - }
// ^ ^
// pos limit
//
// { X X X - - - - X X X }
// ^ ^
// limit pos
private final byte[] buffer;
/** Buffer with readable data. Guarded by SpdyStream.this. */
private final OkBuffer readBuffer = new OkBuffer();
private SpdyDataInputStream(int bufferLength) {
// TODO: We probably need to change to growable buffers here pretty soon.
// Otherwise we have a performance problem where we pay for 64 KiB even if we aren't using it.
buffer = connection.bufferPool.getBuf(bufferLength);
}
/** the next byte to be read, or -1 if the buffer is empty. Never buffer.length */
private int pos = -1;
/** the last byte to be read. Never buffer.length */
private int limit;
/** Maximum number of bytes to buffer before reporting a flow control error. */
private final long maxByteCount;
/** True if the caller has closed this stream. */
private boolean closed;
@@ -362,75 +354,42 @@ public final class SpdyStream {
*/
private boolean finished;
@Override public int available() throws IOException {
synchronized (SpdyStream.this) {
checkNotClosed();
if (pos == -1) {
return 0;
} else if (limit > pos) {
return limit - pos;
} else {
return limit + (buffer.length - pos);
}
}
private SpdyDataSource(long maxByteCount) {
this.maxByteCount = maxByteCount;
}
@Override public int read() throws IOException {
return Util.readSingleByte(this);
}
@Override public long read(OkBuffer sink, long byteCount, Deadline deadline)
throws IOException {
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
@Override public int read(byte[] b, int offset, int count) throws IOException {
int copied = 0;
long read;
synchronized (SpdyStream.this) {
checkOffsetAndCount(b.length, offset, count);
waitUntilReadable();
checkNotClosed();
if (readBuffer.byteCount() == 0) return -1; // This source is exhausted.
if (pos == -1) {
return -1;
}
// drain from [pos..buffer.length)
if (limit <= pos) {
int bytesToCopy = Math.min(count, buffer.length - pos);
System.arraycopy(buffer, pos, b, offset, bytesToCopy);
pos += bytesToCopy;
copied += bytesToCopy;
if (pos == buffer.length) {
pos = 0;
}
}
// drain from [pos..limit)
if (copied < count) {
int bytesToCopy = Math.min(limit - pos, count - copied);
System.arraycopy(buffer, pos, b, offset + copied, bytesToCopy);
pos += bytesToCopy;
copied += bytesToCopy;
}
// Move bytes from the read buffer into the caller's buffer.
read = readBuffer.read(sink, Math.min(byteCount, readBuffer.byteCount()), deadline);
// Flow control: notify the peer that we're ready for more data!
unacknowledgedBytesRead += copied;
unacknowledgedBytesRead += read;
if (unacknowledgedBytesRead >= connection.okHttpSettings.getInitialWindowSize() / 2) {
connection.writeWindowUpdateLater(id, unacknowledgedBytesRead);
unacknowledgedBytesRead = 0;
}
if (pos == limit) {
pos = -1;
limit = 0;
}
}
// Update connection.unacknowledgedBytesRead outside the stream lock.
synchronized (connection) { // Multiple application threads may hit this section.
connection.unacknowledgedBytesRead += copied;
connection.unacknowledgedBytesRead += read;
if (connection.unacknowledgedBytesRead
>= connection.okHttpSettings.getInitialWindowSize() / 2) {
connection.writeWindowUpdateLater(0, connection.unacknowledgedBytesRead);
connection.unacknowledgedBytesRead = 0;
}
}
return copied;
return read;
}
/**
@@ -446,7 +405,7 @@ public final class SpdyStream {
remaining = readTimeoutMillis;
}
try {
while (pos == -1 && !finished && !closed && errorCode == null) {
while (readBuffer.byteCount() == 0 && !finished && !closed && errorCode == null) {
if (readTimeoutMillis == 0) {
SpdyStream.this.wait();
} else if (remaining > 0) {
@@ -461,71 +420,51 @@ public final class SpdyStream {
}
}
void receive(InputStream in, int byteCount) throws IOException {
void receive(BufferedSource in, long byteCount) throws IOException {
assert (!Thread.holdsLock(SpdyStream.this));
if (byteCount == 0) {
return;
}
int pos;
int limit;
int firstNewByte;
boolean finished;
boolean flowControlError;
synchronized (SpdyStream.this) {
finished = this.finished;
pos = this.pos;
firstNewByte = this.limit;
limit = this.limit;
flowControlError = byteCount > buffer.length - available();
}
// If the peer sends more data than we can handle, discard it and close the connection.
if (flowControlError) {
Util.skipByReading(in, byteCount);
closeLater(ErrorCode.FLOW_CONTROL_ERROR);
return;
}
// Discard data received after the stream is finished. It's probably a benign race.
if (finished) {
Util.skipByReading(in, byteCount);
return;
}
// Fill the buffer without holding any locks. First fill [limit..buffer.length) if that
// won't overwrite unread data. Then fill [limit..pos). We can't hold a lock, otherwise
// writes will be blocked until reads complete.
if (pos < limit) {
int firstCopyCount = Math.min(byteCount, buffer.length - limit);
Util.readFully(in, buffer, limit, firstCopyCount);
limit += firstCopyCount;
byteCount -= firstCopyCount;
if (limit == buffer.length) {
limit = 0;
while (byteCount > 0) {
boolean finished;
boolean flowControlError;
synchronized (SpdyStream.this) {
finished = this.finished;
flowControlError = byteCount + readBuffer.byteCount() > maxByteCount;
}
}
if (byteCount > 0) {
Util.readFully(in, buffer, limit, byteCount);
limit += byteCount;
}
synchronized (SpdyStream.this) {
// Update the new limit, and mark the position as readable if necessary.
this.limit = limit;
if (this.pos == -1) {
this.pos = firstNewByte;
SpdyStream.this.notifyAll();
// If the peer sends more data than we can handle, discard it and close the connection.
if (flowControlError) {
in.skip(byteCount, Deadline.NONE);
closeLater(ErrorCode.FLOW_CONTROL_ERROR);
return;
}
// Discard data received after the stream is finished. It's probably a benign race.
if (finished) {
in.skip(byteCount, Deadline.NONE);
return;
}
// Fill the receive buffer without holding any locks.
long read = in.read(receiveBuffer, byteCount, Deadline.NONE);
if (read == -1) throw new EOFException();
byteCount -= read;
// Move the received data to the read buffer to the reader can read it.
synchronized (SpdyStream.this) {
boolean wasEmpty = readBuffer.byteCount() == 0;
readBuffer.write(receiveBuffer, receiveBuffer.byteCount(), Deadline.NONE);
if (wasEmpty) {
SpdyStream.this.notifyAll();
}
}
}
}
@Override public void close() throws IOException {
@Override public void close(Deadline deadline) throws IOException {
synchronized (SpdyStream.this) {
closed = true;
readBuffer.clear();
SpdyStream.this.notifyAll();
SpdyStream.this.connection.bufferPool.returnBuf(buffer);
}
cancelStreamIfNecessary();
}
@@ -545,7 +484,7 @@ public final class SpdyStream {
boolean open;
boolean cancel;
synchronized (this) {
cancel = !in.finished && in.closed && (out.finished || out.closed);
cancel = !source.finished && source.closed && (out.finished || out.closed);
open = isOpen();
}
if (cancel) {

View File

@@ -15,15 +15,15 @@
*/
package com.squareup.okhttp.internal.spdy;
import com.squareup.okhttp.internal.bytes.BufferedSource;
import com.squareup.okhttp.internal.bytes.ByteString;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import static org.junit.Assert.fail;
class BaseTestHandler implements FrameReader.Handler {
@Override public void data(boolean inFinished, int streamId, InputStream in, int length)
@Override public void data(boolean inFinished, int streamId, BufferedSource source, int length)
throws IOException {
fail();
}

View File

@@ -16,12 +16,12 @@
package com.squareup.okhttp.internal.spdy;
import com.squareup.okhttp.internal.Util;
import com.squareup.okhttp.internal.bytes.BufferedSource;
import com.squareup.okhttp.internal.bytes.ByteString;
import com.squareup.okhttp.internal.bytes.OkBuffer;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.List;
import org.junit.Test;
@@ -348,14 +348,13 @@ public class Http20Draft09Test {
FrameReader fr = newReader(out);
fr.nextFrame(new BaseTestHandler() {
@Override public void data(boolean inFinished, int streamId, InputStream in, int length)
throws IOException {
@Override public void data(
boolean inFinished, int streamId, BufferedSource source, int length) throws IOException {
assertFalse(inFinished);
assertEquals(expectedStreamId, streamId);
assertEquals(16383, length);
byte[] data = new byte[length];
Util.readFully(in, data);
for (byte b : data){
ByteString data = source.readByteString(length);
for (byte b : data.toByteArray()){
assertEquals(2, b);
}
}

View File

@@ -17,6 +17,7 @@
package com.squareup.okhttp.internal.spdy;
import com.squareup.okhttp.internal.Util;
import com.squareup.okhttp.internal.bytes.BufferedSource;
import com.squareup.okhttp.internal.bytes.ByteString;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
@@ -39,7 +40,7 @@ public final class MockSpdyPeer implements Closeable {
private boolean client = false;
private Variant variant = new Spdy3();
private final ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
private FrameWriter frameWriter = variant.newWriter(bytesOut, client);;
private FrameWriter frameWriter = variant.newWriter(bytesOut, client);
private final List<OutFrame> outFrames = new ArrayList<OutFrame>();
private final BlockingQueue<InFrame> inFrames = new LinkedBlockingQueue<InFrame>();
private int port;
@@ -232,14 +233,13 @@ public final class MockSpdyPeer implements Closeable {
this.headersMode = headersMode;
}
@Override public void data(boolean inFinished, int streamId, InputStream in, int length)
@Override public void data(boolean inFinished, int streamId, BufferedSource source, int length)
throws IOException {
if (this.type != -1) throw new IllegalStateException();
this.type = Spdy3.TYPE_DATA;
this.inFinished = inFinished;
this.streamId = streamId;
this.data = new byte[length];
Util.readFully(in, this.data);
this.data = source.readByteString(length).toByteArray();
}
@Override public void rstStream(int streamId, ErrorCode errorCode) {