1
0
mirror of https://github.com/square/okhttp.git synced 2026-01-27 04:22:07 +03:00

Merge pull request #71 from square/jwilson/errors

Improve SPDY error handling and concurrency.
This commit is contained in:
Jake Wharton
2012-12-29 22:23:16 -08:00
5 changed files with 218 additions and 58 deletions

View File

@@ -20,7 +20,6 @@ import com.squareup.okhttp.internal.net.spdy.SpdyConnection;
import com.squareup.okhttp.internal.net.spdy.SpdyStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.CacheRequest;
import java.util.List;
@@ -68,16 +67,10 @@ public final class SpdyTransport implements Transport {
@Override public ResponseHeaders readResponseHeaders() throws IOException {
// TODO: fix the SPDY implementation so this throws a (buffered) IOException
try {
List<String> nameValueBlock = stream.getResponseHeaders();
RawHeaders rawHeaders = RawHeaders.fromNameValueBlock(nameValueBlock);
rawHeaders.computeResponseStatusLineFromSpdyHeaders();
return new ResponseHeaders(httpEngine.uri, rawHeaders);
} catch (InterruptedException e) {
InterruptedIOException rethrow = new InterruptedIOException();
rethrow.initCause(e);
throw rethrow;
}
List<String> nameValueBlock = stream.getResponseHeaders();
RawHeaders rawHeaders = RawHeaders.fromNameValueBlock(nameValueBlock);
rawHeaders.computeResponseStatusLineFromSpdyHeaders();
return new ResponseHeaders(httpEngine.uri, rawHeaders);
}
@Override public InputStream getTransferStream(CacheRequest cacheRequest) throws IOException {

View File

@@ -22,6 +22,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ProtocolException;
import java.net.Socket;
import java.util.HashMap;
import java.util.List;
@@ -301,7 +302,12 @@ public final class SpdyConnection implements Closeable {
throws IOException {
SpdyStream dataStream = getStream(streamId);
if (dataStream != null) {
dataStream.receiveData(in, flags, length);
try {
dataStream.receiveData(in, flags, length);
} catch (ProtocolException e) {
Streams.skipByReading(in, length);
dataStream.closeLater(SpdyStream.RST_FLOW_CONTROL_ERROR);
}
} else {
writeSynResetLater(streamId, SpdyStream.RST_INVALID_STREAM);
Streams.skipByReading(in, length);
@@ -317,14 +323,8 @@ public final class SpdyConnection implements Closeable {
previous = streams.put(streamId, synStream);
}
if (previous != null) {
writeExecutor.execute(new Runnable() {
@Override public void run() {
try {
previous.close(SpdyStream.RST_PROTOCOL_ERROR);
} catch (IOException ignored) {
}
}
});
previous.closeLater(SpdyStream.RST_PROTOCOL_ERROR);
removeStream(streamId);
return;
}
callbackExecutor.execute(new Runnable() {
@@ -343,7 +343,11 @@ public final class SpdyConnection implements Closeable {
SpdyStream replyStream = getStream(streamId);
if (replyStream != null) {
// TODO: honor incoming FLAG_FIN.
replyStream.receiveReply(nameValueBlock);
try {
replyStream.receiveReply(nameValueBlock);
} catch (ProtocolException e) {
replyStream.closeLater(SpdyStream.RST_PROTOCOL_ERROR);
}
} else {
writeSynResetLater(streamId, SpdyStream.RST_INVALID_STREAM);
}

View File

@@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.ProtocolException;
import static java.nio.ByteOrder.BIG_ENDIAN;
import java.util.List;
@@ -101,13 +102,20 @@ public final class SpdyStream {
* Returns the stream's response headers, blocking if necessary if they
* have not been received yet.
*/
public synchronized List<String> getResponseHeaders() throws InterruptedException {
while (responseHeaders == null && rstStatusCode == -1) {
wait();
public synchronized List<String> getResponseHeaders() throws IOException {
try {
while (responseHeaders == null && rstStatusCode == -1) {
wait();
}
if (responseHeaders != null) {
return responseHeaders;
}
throw new IOException("stream was reset: " + rstStatusCode);
} catch (InterruptedException e) {
InterruptedIOException rethrow = new InterruptedIOException();
rethrow.initCause(e);
throw rethrow;
}
// TODO: throw InterruptedIOException?
// TODO: throw if responseHeaders == null
return responseHeaders;
}
/**
@@ -176,24 +184,41 @@ public final class SpdyStream {
* Abnormally terminate this stream.
*/
public void close(int rstStatusCode) throws IOException {
if (!closeInternal(rstStatusCode)) {
return; // Already closed.
}
connection.writeSynReset(id, rstStatusCode);
}
void closeLater(int rstStatusCode) {
if (!closeInternal(rstStatusCode)) {
return; // Already closed.
}
connection.writeSynResetLater(id, rstStatusCode);
}
/**
* Returns true if this stream was closed.
*/
private boolean closeInternal(int rstStatusCode) {
assert (!Thread.holdsLock(this));
synchronized (this) {
// TODO: no-op if inFinished == true and outFinished == true ?
if (this.rstStatusCode != -1) {
return; // Already closed.
return false;
}
this.rstStatusCode = rstStatusCode;
in.finished = true;
out.finished = true;
notifyAll();
}
connection.writeSynReset(id, rstStatusCode);
connection.removeStream(id);
return true;
}
synchronized void receiveReply(List<String> strings) throws IOException {
if (!isLocallyInitiated() || responseHeaders != null) {
throw new IOException(); // TODO: send RST
throw new ProtocolException();
}
responseHeaders = strings;
notifyAll();
@@ -328,21 +353,28 @@ public final class SpdyStream {
void receive(InputStream in, int byteCount) throws IOException {
assert (!Thread.holdsLock(SpdyStream.this));
if (byteCount == 0) {
return;
}
int pos;
int limit;
int firstNewByte;
boolean finished;
synchronized (SpdyStream.this) {
if (finished) {
return; // ignore this; probably a benign race
}
if (byteCount == 0) {
return;
}
if (byteCount > buffer.length - available()) {
throw new IOException(); // TODO: RST the stream
}
finished = this.finished;
pos = this.pos;
firstNewByte = limit = this.limit;
firstNewByte = this.limit;
limit = this.limit;
if (byteCount > buffer.length - available()) {
throw new ProtocolException();
}
}
// Discard data received after the stream is finished. It's probably a benign race.
if (finished) {
Streams.skipByReading(in, byteCount);
return;
}
// Fill the buffer without holding any locks. First fill [limit..buffer.length) if that

View File

@@ -36,6 +36,8 @@ import java.util.concurrent.LinkedBlockingQueue;
*/
public final class MockSpdyPeer {
private int frameCount = 0;
private final ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
private final SpdyWriter spdyWriter = new SpdyWriter(bytesOut);
private final List<OutFrame> outFrames = new ArrayList<OutFrame>();
private final BlockingQueue<InFrame> inFrames = new LinkedBlockingQueue<InFrame>();
private int port;
@@ -47,9 +49,9 @@ public final class MockSpdyPeer {
}
public SpdyWriter sendFrame() {
OutFrame frame = new OutFrame(frameCount++);
OutFrame frame = new OutFrame(frameCount++, bytesOut.size());
outFrames.add(frame);
return new SpdyWriter(frame.out);
return spdyWriter;
}
public int getPort() {
@@ -81,6 +83,7 @@ public final class MockSpdyPeer {
InputStream in = socket.getInputStream();
Iterator<OutFrame> outFramesIterator = outFrames.iterator();
byte[] outBytes = bytesOut.toByteArray();
OutFrame nextOutFrame = null;
for (int i = 0; i < frameCount; i++) {
@@ -89,9 +92,17 @@ public final class MockSpdyPeer {
}
if (nextOutFrame != null && nextOutFrame.sequence == i) {
int start = nextOutFrame.start;
int end;
if (outFramesIterator.hasNext()) {
nextOutFrame = outFramesIterator.next();
end = nextOutFrame.start;
} else {
end = outBytes.length;
}
// write a frame
nextOutFrame.out.writeTo(out);
nextOutFrame = null;
out.write(outBytes, start, end - start);
} else {
// read a frame
@@ -109,10 +120,11 @@ public final class MockSpdyPeer {
private static class OutFrame {
private final int sequence;
private final ByteArrayOutputStream out = new ByteArrayOutputStream();
private final int start;
private OutFrame(int sequence) {
private OutFrame(int sequence, int start) {
this.sequence = sequence;
this.start = start;
}
}

View File

@@ -25,7 +25,10 @@ import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.TYPE_PING;
import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.TYPE_RST_STREAM;
import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.TYPE_SYN_REPLY;
import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.TYPE_SYN_STREAM;
import static com.squareup.okhttp.internal.net.spdy.SpdyStream.RST_FLOW_CONTROL_ERROR;
import static com.squareup.okhttp.internal.net.spdy.SpdyStream.RST_INVALID_STREAM;
import static com.squareup.okhttp.internal.net.spdy.SpdyStream.RST_PROTOCOL_ERROR;
import static com.squareup.okhttp.internal.net.spdy.SpdyStream.RST_REFUSED_STREAM;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -483,24 +486,140 @@ public final class SpdyConnectionTest {
assertEquals(SpdyConnection.FLAG_FIN, synStream.flags);
}
@Test public void remoteDoubleReply() {
// We should get a PROTOCOL ERROR
// TODO
@Test public void remoteDoubleSynReply() throws Exception {
// write the mocking script
peer.acceptFrame();
peer.sendFrame().synReply(0, 1, Arrays.asList("a", "android"));
peer.sendFrame().synReply(0, 1, Arrays.asList("b", "banana"));
peer.acceptFrame();
peer.play();
// play it back
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
SpdyStream stream = connection.newStream(Arrays.asList("c", "cola"), true, true);
assertEquals(Arrays.asList("a", "android"), stream.getResponseHeaders());
assertStreamData("", stream.getInputStream());
// verify the peer received what was expected
MockSpdyPeer.InFrame synStream = peer.takeFrame();
assertEquals(TYPE_SYN_STREAM, synStream.type);
MockSpdyPeer.InFrame rstStream = peer.takeFrame();
assertEquals(TYPE_RST_STREAM, rstStream.type);
assertEquals(1, rstStream.streamId);
assertEquals(0, rstStream.flags);
assertEquals(RST_PROTOCOL_ERROR, rstStream.statusCode);
}
@Test public void remoteSendsDataAfterInFinished() {
// We have a bug where we don't fastfoward the stream
// TODO
@Test public void remoteDoubleSynStream() throws Exception {
// write the mocking script
peer.sendFrame().synStream(0, 2, 0, 0, Arrays.asList("a", "android"));
peer.acceptFrame();
peer.sendFrame().synStream(0, 2, 0, 0, Arrays.asList("b", "banana"));
peer.acceptFrame();
peer.play();
// play it back
final AtomicInteger receiveCount = new AtomicInteger();
IncomingStreamHandler handler = new IncomingStreamHandler() {
@Override public void receive(SpdyStream stream) throws IOException {
receiveCount.incrementAndGet();
assertEquals(Arrays.asList("a", "android"), stream.getRequestHeaders());
assertEquals(-1, stream.getRstStatusCode());
stream.reply(Arrays.asList("c", "cola"), true);
}
};
new SpdyConnection.Builder(true, peer.openSocket())
.handler(handler)
.build();
// verify the peer received what was expected
MockSpdyPeer.InFrame reply = peer.takeFrame();
assertEquals(TYPE_SYN_REPLY, reply.type);
MockSpdyPeer.InFrame rstStream = peer.takeFrame();
assertEquals(TYPE_RST_STREAM, rstStream.type);
assertEquals(2, rstStream.streamId);
assertEquals(0, rstStream.flags);
assertEquals(RST_PROTOCOL_ERROR, rstStream.statusCode);
assertEquals(1, receiveCount.intValue());
}
@Test public void remoteSendsTooMuchData() {
// We should send RST_FLOW_CONTROL_ERROR (and fastforward the stream)
// TODO
@Test public void remoteSendsDataAfterInFinished() throws Exception {
// write the mocking script
peer.acceptFrame();
peer.sendFrame().synReply(0, 1, Arrays.asList("a", "android"));
peer.sendFrame().data(SpdyConnection.FLAG_FIN, 1, "robot".getBytes("UTF-8"));
peer.sendFrame().data(SpdyConnection.FLAG_FIN, 1, "c3po".getBytes("UTF-8")); // Ignored.
peer.sendFrame().ping(0, 2); // Ping just to make sure the stream was fastforwarded.
peer.acceptFrame();
peer.play();
// play it back
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
SpdyStream stream = connection.newStream(Arrays.asList("b", "banana"), true, true);
assertEquals(Arrays.asList("a", "android"), stream.getResponseHeaders());
assertStreamData("robot", stream.getInputStream());
// verify the peer received what was expected
MockSpdyPeer.InFrame synStream = peer.takeFrame();
assertEquals(TYPE_SYN_STREAM, synStream.type);
MockSpdyPeer.InFrame ping = peer.takeFrame();
assertEquals(TYPE_PING, ping.type);
assertEquals(2, ping.streamId);
assertEquals(0, ping.flags);
}
@Test public void remoteSendsRefusedStreamBeforeReplyHeaders() {
// Calling getResponseHeaders() should throw an IOException if the stream is refused.
// TODO
@Test public void remoteSendsTooMuchData() throws Exception {
// write the mocking script
peer.acceptFrame();
peer.sendFrame().synReply(0, 1, Arrays.asList("b", "banana"));
peer.sendFrame().data(0, 1, new byte[64 * 1024 + 1]);
peer.acceptFrame();
peer.sendFrame().ping(0, 2); // Ping just to make sure the stream was fastforwarded.
peer.acceptFrame();
peer.play();
// play it back
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
SpdyStream stream = connection.newStream(Arrays.asList("a", "android"), true, true);
assertEquals(Arrays.asList("b", "banana"), stream.getResponseHeaders());
// verify the peer received what was expected
MockSpdyPeer.InFrame synStream = peer.takeFrame();
assertEquals(TYPE_SYN_STREAM, synStream.type);
MockSpdyPeer.InFrame rstStream = peer.takeFrame();
assertEquals(TYPE_RST_STREAM, rstStream.type);
assertEquals(1, rstStream.streamId);
assertEquals(0, rstStream.flags);
assertEquals(RST_FLOW_CONTROL_ERROR, rstStream.statusCode);
MockSpdyPeer.InFrame ping = peer.takeFrame();
assertEquals(TYPE_PING, ping.type);
assertEquals(2, ping.streamId);
}
@Test public void remoteSendsRefusedStreamBeforeReplyHeaders() throws Exception {
// write the mocking script
peer.acceptFrame();
peer.sendFrame().synReset(1, RST_REFUSED_STREAM);
peer.sendFrame().ping(0, 2);
peer.acceptFrame();
peer.play();
// play it back
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
SpdyStream stream = connection.newStream(Arrays.asList("a", "android"), true, true);
try {
stream.getResponseHeaders();
fail();
} catch (IOException expected) {
}
// verify the peer received what was expected
MockSpdyPeer.InFrame synStream = peer.takeFrame();
assertEquals(TYPE_SYN_STREAM, synStream.type);
MockSpdyPeer.InFrame ping = peer.takeFrame();
assertEquals(TYPE_PING, ping.type);
assertEquals(2, ping.streamId);
assertEquals(0, ping.flags);
}
private void writeAndClose(SpdyStream stream, String data) throws IOException {