1
0
mirror of https://github.com/square/okhttp.git synced 2026-01-27 04:22:07 +03:00

Address some todos around stream failures and peer errors.

This commit is contained in:
Jesse Wilson
2012-09-25 17:32:28 -04:00
parent 0bfb878783
commit 68ebb704fb
4 changed files with 150 additions and 48 deletions

View File

@@ -29,6 +29,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import libcore.io.Streams;
import static libcore.net.spdy.Threads.newThreadFactory;
@@ -162,16 +163,11 @@ public final class SpdyConnection implements Closeable {
}
private synchronized SpdyStream getStream(int id) {
SpdyStream stream = streams.get(id);
if (stream == null) {
// TODO: rst stream
throw new UnsupportedOperationException("TODO " + id + "; " + streams);
}
return stream;
return streams.get(id);
}
synchronized void removeStream(int streamId) {
streams.remove(streamId);
synchronized SpdyStream removeStream(int streamId) {
return streams.remove(streamId);
}
/**
@@ -185,8 +181,8 @@ public final class SpdyConnection implements Closeable {
public SpdyStream newStream(List<String> requestHeaders, boolean out, boolean in)
throws IOException {
int flags = (out ? 0 : FLAG_FIN) | (in ? 0 : FLAG_UNIDIRECTIONAL);
int associatedStreamId = 0; // TODO
int priority = 0; // TODO
int associatedStreamId = 0; // TODO: permit the caller to specify an associated stream.
int priority = 0; // TODO: permit the caller to specify a priority.
SpdyStream stream;
int streamId;
@@ -211,7 +207,7 @@ public final class SpdyConnection implements Closeable {
void writeSynReply(int streamId, List<String> alternating) throws IOException {
synchronized (spdyWriter) {
int flags = 0; // TODO
int flags = 0; // TODO: permit the caller to send FLAG_FIN
spdyWriter.flags = flags;
spdyWriter.id = streamId;
spdyWriter.nameValueBlock = alternating;
@@ -239,8 +235,7 @@ public final class SpdyConnection implements Closeable {
void writeSynReset(int streamId, int statusCode) throws IOException {
synchronized (spdyWriter) {
int flags = 0; // TODO
spdyWriter.flags = flags;
spdyWriter.flags = 0;
spdyWriter.id = streamId;
spdyWriter.statusCode = statusCode;
spdyWriter.synReset();
@@ -294,6 +289,11 @@ public final class SpdyConnection implements Closeable {
}
@Override public void close() throws IOException {
close(null);
}
private synchronized void close(Throwable reason) throws IOException {
// TODO: forward 'reason' to forced closed streams?
// TODO: graceful close; send RST frames
// TODO: close all streams to release waiting readers
writeExecutor.shutdown();
@@ -337,12 +337,17 @@ public final class SpdyConnection implements Closeable {
private class Reader implements Runnable {
@Override public void run() {
Throwable failure = null;
try {
while (readFrame()) {
}
close();
} catch (Throwable e) {
e.printStackTrace(); // TODO
failure = e;
}
try {
close(failure);
} catch (IOException ignored) {
}
}
@@ -355,21 +360,26 @@ public final class SpdyConnection implements Closeable {
return false;
case TYPE_DATA:
getStream(spdyReader.id)
.receiveData(spdyReader.in, spdyReader.flags, spdyReader.length);
SpdyStream dataStream = getStream(spdyReader.id);
if (dataStream != null) {
dataStream.receiveData(spdyReader.in, spdyReader.flags, spdyReader.length);
} else {
writeSynResetLater(spdyReader.id, SpdyStream.RST_INVALID_STREAM);
Streams.skipByReading(spdyReader.in, spdyReader.length);
}
return true;
case TYPE_SYN_STREAM:
final SpdyStream stream = new SpdyStream(spdyReader.id, SpdyConnection.this,
final SpdyStream synStream = new SpdyStream(spdyReader.id, SpdyConnection.this,
spdyReader.nameValueBlock, spdyReader.flags);
SpdyStream previous = streams.put(spdyReader.id, stream);
SpdyStream previous = streams.put(spdyReader.id, synStream);
if (previous != null) {
previous.close(SpdyStream.RST_PROTOCOL_ERROR);
}
callbackExecutor.execute(new Runnable() {
@Override public void run() {
try {
handler.receive(stream);
handler.receive(synStream);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -378,18 +388,27 @@ public final class SpdyConnection implements Closeable {
return true;
case TYPE_SYN_REPLY:
// TODO: honor flags
getStream(spdyReader.id).receiveReply(spdyReader.nameValueBlock);
SpdyStream replyStream = getStream(spdyReader.id);
if (replyStream != null) {
// TODO: honor incoming FLAG_FIN.
replyStream.receiveReply(spdyReader.nameValueBlock);
} else {
writeSynResetLater(spdyReader.id, SpdyStream.RST_INVALID_STREAM);
}
return true;
case TYPE_RST_STREAM:
getStream(spdyReader.id).receiveRstStream(spdyReader.statusCode);
SpdyStream rstStream = removeStream(spdyReader.id);
if (rstStream != null) {
rstStream.receiveRstStream(spdyReader.statusCode);
}
return true;
case SpdyConnection.TYPE_SETTINGS:
int numberOfEntries = spdyReader.in.readInt();
if (spdyReader.length != 4 + numberOfEntries * 8) {
// TODO: DIE
if (spdyReader.length != 4 + 8 * numberOfEntries) {
throw new IOException("TYPE_SETTINGS frame length is inconsistent: "
+ spdyReader.length + " != 4 + 8 * " + numberOfEntries);
}
if ((spdyReader.flags & FLAG_SETTINGS_CLEAR_PREVIOUSLY_PERSISTED_SETTINGS) != 0) {
clearSettings();
@@ -427,8 +446,7 @@ public final class SpdyConnection implements Closeable {
throw new UnsupportedOperationException();
default:
// TODO: throw IOException here?
return false;
throw new IOException("Unexpected frame: " + Integer.toHexString(spdyReader.type));
}
}
}

View File

@@ -87,7 +87,7 @@ final class SpdyReader {
try {
w1 = in.readInt();
} catch (EOFException e) {
return SpdyConnection.TYPE_EOF;
return type = SpdyConnection.TYPE_EOF;
}
int w2 = in.readInt();
@@ -125,7 +125,7 @@ final class SpdyReader {
}
} else {
id = w1 & 0x7fffffff;
return SpdyConnection.TYPE_DATA;
return type = SpdyConnection.TYPE_DATA;
}
}

View File

@@ -118,9 +118,13 @@ public final class SpdyStream {
/**
* Returns the reason why this stream was closed, or -1 if it closed
* normally or has not yet been closed.
* normally or has not yet been closed. Valid reasons are {@link
* #RST_PROTOCOL_ERROR}, {@link #RST_INVALID_STREAM}, {@link
* #RST_REFUSED_STREAM}, {@link #RST_UNSUPPORTED_VERSION}, {@link
* #RST_CANCEL}, {@link #RST_INTERNAL_ERROR} and {@link
* #RST_FLOW_CONTROL_ERROR}.
*/
public synchronized int getRstStatusCode() { // TODO: rename this?
public synchronized int getRstStatusCode() {
return rstStatusCode;
}

View File

@@ -16,15 +16,21 @@
package libcore.net.spdy;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.TestCase;
import static libcore.net.spdy.SpdyConnection.TYPE_PING;
import static libcore.net.spdy.SpdyConnection.TYPE_RST_STREAM;
import static libcore.net.spdy.SpdyConnection.TYPE_SYN_REPLY;
import static libcore.net.spdy.SpdyConnection.TYPE_SYN_STREAM;
import static libcore.net.spdy.SpdyStream.RST_INVALID_STREAM;
public final class SpdyConnectionTest extends TestCase {
private static final IncomingStreamHandler REJECT_INCOMING_STREAMS
= new IncomingStreamHandler() {
@@ -51,17 +57,13 @@ public final class SpdyConnectionTest extends TestCase {
// play it back
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
SpdyStream stream = connection.newStream(Arrays.asList("b", "banana"), true, true);
List<String> responseHeaders = stream.getResponseHeaders();
assertEquals(Arrays.asList("a", "android"), responseHeaders);
BufferedReader reader = new BufferedReader(new InputStreamReader(stream.getInputStream()));
assertEquals("robot", reader.readLine());
assertEquals(null, reader.readLine());
OutputStream out = stream.getOutputStream();
out.write("c3po".getBytes("UTF-8"));
out.close();
assertEquals(Arrays.asList("a", "android"), stream.getResponseHeaders());
assertStreamData("robot", stream.getInputStream());
writeAndClose(stream, "c3po");
// verify the peer received what was expected
MockSpdyPeer.InFrame synStream = peer.takeFrame();
assertEquals(TYPE_SYN_STREAM, synStream.reader.type);
assertEquals(0, synStream.reader.flags);
assertEquals(1, synStream.reader.id);
assertEquals(0, synStream.reader.associatedStreamId);
@@ -82,11 +84,14 @@ public final class SpdyConnectionTest extends TestCase {
peer.play();
// play it back
final AtomicInteger receiveCount = new AtomicInteger();
IncomingStreamHandler handler = new IncomingStreamHandler() {
@Override public void receive(SpdyStream stream) throws IOException {
receiveCount.incrementAndGet();
assertEquals(Arrays.asList("a", "android"), stream.getRequestHeaders());
assertEquals(-1, stream.getRstStatusCode());
stream.reply(Arrays.asList("b", "banana"));
}
};
new SpdyConnection.Builder(true, peer.openSocket())
@@ -94,11 +99,13 @@ public final class SpdyConnectionTest extends TestCase {
.build();
// verify the peer received what was expected
MockSpdyPeer.InFrame synStream = peer.takeFrame();
assertEquals(0, synStream.reader.flags);
assertEquals(2, synStream.reader.id);
assertEquals(0, synStream.reader.associatedStreamId);
assertEquals(Arrays.asList("b", "banana"), synStream.reader.nameValueBlock);
MockSpdyPeer.InFrame reply = peer.takeFrame();
assertEquals(TYPE_SYN_REPLY, reply.reader.type);
assertEquals(0, reply.reader.flags);
assertEquals(2, reply.reader.id);
assertEquals(0, reply.reader.associatedStreamId);
assertEquals(Arrays.asList("b", "banana"), reply.reader.nameValueBlock);
assertEquals(1, receiveCount.get());
}
public void testServerPingsClient() throws Exception {
@@ -114,6 +121,7 @@ public final class SpdyConnectionTest extends TestCase {
// verify the peer received what was expected
MockSpdyPeer.InFrame ping = peer.takeFrame();
assertEquals(TYPE_PING, ping.reader.type);
assertEquals(0, ping.reader.flags);
assertEquals(2, ping.reader.id);
}
@@ -134,6 +142,7 @@ public final class SpdyConnectionTest extends TestCase {
// verify the peer received what was expected
MockSpdyPeer.InFrame pingFrame = peer.takeFrame();
assertEquals(TYPE_PING, pingFrame.reader.type);
assertEquals(0, pingFrame.reader.flags);
assertEquals(1, pingFrame.reader.id);
}
@@ -148,7 +157,7 @@ public final class SpdyConnectionTest extends TestCase {
peer.play();
// play it back
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket())
new SpdyConnection.Builder(true, peer.openSocket())
.handler(REJECT_INCOMING_STREAMS)
.build();
@@ -180,4 +189,75 @@ public final class SpdyConnectionTest extends TestCase {
assertEquals(10, connection.peerMaxConcurrentStreams);
}
}
public void testBogusDataFrameDoesNotDisruptConnection() throws Exception {
// write the mocking script
SpdyWriter unexpectedData = peer.sendFrame();
unexpectedData.flags = SpdyConnection.FLAG_FIN;
unexpectedData.id = 42;
unexpectedData.data("bogus".getBytes("UTF-8"));
peer.acceptFrame(); // RST_STREAM
peer.sendPing(2);
peer.acceptFrame(); // PING
peer.play();
// play it back
new SpdyConnection.Builder(true, peer.openSocket())
.handler(REJECT_INCOMING_STREAMS)
.build();
// verify the peer received what was expected
MockSpdyPeer.InFrame rstStream = peer.takeFrame();
assertEquals(TYPE_RST_STREAM, rstStream.reader.type);
assertEquals(0, rstStream.reader.flags);
assertEquals(8, rstStream.reader.length);
assertEquals(42, rstStream.reader.id);
assertEquals(RST_INVALID_STREAM, rstStream.reader.statusCode);
MockSpdyPeer.InFrame ping = peer.takeFrame();
assertEquals(2, ping.reader.id);
}
public void testBogusReplyFrameDoesNotDisruptConnection() throws Exception {
// write the mocking script
SpdyWriter unexpectedReply = peer.sendFrame();
unexpectedReply.nameValueBlock = Arrays.asList("a", "android");
unexpectedReply.flags = 0;
unexpectedReply.id = 42;
unexpectedReply.synReply();
peer.acceptFrame(); // RST_STREAM
peer.sendPing(2);
peer.acceptFrame(); // PING
peer.play();
// play it back
new SpdyConnection.Builder(true, peer.openSocket())
.handler(REJECT_INCOMING_STREAMS)
.build();
// verify the peer received what was expected
MockSpdyPeer.InFrame rstStream = peer.takeFrame();
assertEquals(TYPE_RST_STREAM, rstStream.reader.type);
assertEquals(0, rstStream.reader.flags);
assertEquals(8, rstStream.reader.length);
assertEquals(42, rstStream.reader.id);
assertEquals(RST_INVALID_STREAM, rstStream.reader.statusCode);
MockSpdyPeer.InFrame ping = peer.takeFrame();
assertEquals(2, ping.reader.id);
}
private void writeAndClose(SpdyStream stream, String data) throws IOException {
OutputStream out = stream.getOutputStream();
out.write(data.getBytes("UTF-8"));
out.close();
}
private void assertStreamData(String expected, InputStream inputStream) throws IOException {
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
byte[] buffer = new byte[1024];
for (int count; (count = inputStream.read(buffer)) != -1; ) {
bytesOut.write(buffer, 0, count);
}
String actual = bytesOut.toString("UTF-8");
assertEquals(expected, actual);
}
}