mirror of
https://github.com/square/okhttp.git
synced 2026-01-17 08:42:25 +03:00
Introduce a buffer in SpdyStream's outgoing frames.
Previously we'd consistently send 2048 byte frames because that was Okio's default segment size. This increases OkHttp's typical segment size to the 16 KiB frame size that all HTTP/2 endpoints support.
This commit is contained in:
@@ -941,8 +941,9 @@ public final class Spdy3ConnectionTest {
|
||||
sink.write(new Buffer().writeUtf8("abcde"), 5);
|
||||
stream.writeTimeout().timeout(500, TimeUnit.MILLISECONDS);
|
||||
long startNanos = System.nanoTime();
|
||||
sink.write(new Buffer().writeUtf8("f"), 1);
|
||||
try {
|
||||
sink.write(new Buffer().writeUtf8("f"), 1); // This will time out waiting on the write window.
|
||||
sink.flush(); // This will time out waiting on the write window.
|
||||
fail();
|
||||
} catch (InterruptedIOException expected) {
|
||||
}
|
||||
@@ -957,6 +958,31 @@ public final class Spdy3ConnectionTest {
|
||||
assertEquals(TYPE_RST_STREAM, peer.takeFrame().type);
|
||||
}
|
||||
|
||||
@Test public void outgoingWritesAreBatched() throws Exception {
|
||||
// write the mocking script
|
||||
peer.acceptFrame(); // SYN_STREAM
|
||||
peer.sendFrame().synReply(false, 1, headerEntries("a", "android"));
|
||||
peer.acceptFrame(); // DATA
|
||||
peer.play();
|
||||
|
||||
// play it back
|
||||
SpdyConnection connection = connection(peer, SPDY3);
|
||||
SpdyStream stream = connection.newStream(headerEntries("b", "banana"), true, true);
|
||||
|
||||
// two outgoing writes
|
||||
Sink sink = stream.getSink();
|
||||
sink.write(new Buffer().writeUtf8("abcde"), 5);
|
||||
sink.write(new Buffer().writeUtf8("fghij"), 5);
|
||||
sink.close();
|
||||
|
||||
// verify the peer received one incoming frame
|
||||
assertEquals(TYPE_HEADERS, peer.takeFrame().type);
|
||||
MockSpdyPeer.InFrame data = peer.takeFrame();
|
||||
assertEquals(TYPE_DATA, data.type);
|
||||
assertTrue(Arrays.equals("abcdefghij".getBytes("UTF-8"), data.data));
|
||||
assertTrue(data.inFinished);
|
||||
}
|
||||
|
||||
@Test public void headers() throws Exception {
|
||||
// write the mocking script
|
||||
peer.acceptFrame(); // SYN_STREAM
|
||||
|
||||
@@ -53,7 +53,6 @@ public final class SpdyStream {
|
||||
|
||||
private final int id;
|
||||
private final SpdyConnection connection;
|
||||
private long readTimeoutMillis = 0;
|
||||
|
||||
/** Headers sent by the stream initiator. Immutable and non null. */
|
||||
private final List<Header> requestHeaders;
|
||||
@@ -468,6 +467,14 @@ public final class SpdyStream {
|
||||
* thread safe.
|
||||
*/
|
||||
final class SpdyDataSink implements Sink {
|
||||
private static final long EMIT_BUFFER_SIZE = 16384;
|
||||
|
||||
/**
|
||||
* Buffer of outgoing data. This batches writes of small writes into this sink as larges
|
||||
* frames written to the outgoing connection. Batching saves the (small) framing overhead.
|
||||
*/
|
||||
private final Buffer sendBuffer = new Buffer();
|
||||
|
||||
private boolean closed;
|
||||
|
||||
/**
|
||||
@@ -478,26 +485,34 @@ public final class SpdyStream {
|
||||
|
||||
@Override public void write(Buffer source, long byteCount) throws IOException {
|
||||
assert (!Thread.holdsLock(SpdyStream.this));
|
||||
while (byteCount > 0) {
|
||||
long toWrite;
|
||||
synchronized (SpdyStream.this) {
|
||||
writeTimeout.enter();
|
||||
try {
|
||||
while (bytesLeftInWriteWindow <= 0 && !finished && !closed && errorCode == null) {
|
||||
waitForIo(); // Wait until we receive a WINDOW_UPDATE.
|
||||
}
|
||||
} finally {
|
||||
writeTimeout.exitAndThrowIfTimedOut();
|
||||
}
|
||||
sendBuffer.write(source, byteCount);
|
||||
while (sendBuffer.size() >= EMIT_BUFFER_SIZE) {
|
||||
emitDataFrame(false);
|
||||
}
|
||||
}
|
||||
|
||||
checkOutNotClosed(); // Kick out if the stream was reset or closed while waiting.
|
||||
toWrite = Math.min(bytesLeftInWriteWindow, byteCount);
|
||||
bytesLeftInWriteWindow -= toWrite;
|
||||
/**
|
||||
* Emit a single data frame to the connection. The frame's size be limited by this stream's
|
||||
* write window. This method will block until the write window is nonempty.
|
||||
*/
|
||||
private void emitDataFrame(boolean outFinished) throws IOException {
|
||||
long toWrite;
|
||||
synchronized (SpdyStream.this) {
|
||||
writeTimeout.enter();
|
||||
try {
|
||||
while (bytesLeftInWriteWindow <= 0 && !finished && !closed && errorCode == null) {
|
||||
waitForIo(); // Wait until we receive a WINDOW_UPDATE.
|
||||
}
|
||||
} finally {
|
||||
writeTimeout.exitAndThrowIfTimedOut();
|
||||
}
|
||||
|
||||
byteCount -= toWrite;
|
||||
connection.writeData(id, false, source, toWrite);
|
||||
checkOutNotClosed(); // Kick out if the stream was reset or closed while waiting.
|
||||
toWrite = Math.min(bytesLeftInWriteWindow, sendBuffer.size());
|
||||
bytesLeftInWriteWindow -= toWrite;
|
||||
}
|
||||
|
||||
connection.writeData(id, outFinished && toWrite == sendBuffer.size(), sendBuffer, toWrite);
|
||||
}
|
||||
|
||||
@Override public void flush() throws IOException {
|
||||
@@ -505,6 +520,9 @@ public final class SpdyStream {
|
||||
synchronized (SpdyStream.this) {
|
||||
checkOutNotClosed();
|
||||
}
|
||||
while (sendBuffer.size() > 0) {
|
||||
emitDataFrame(false);
|
||||
}
|
||||
connection.flush();
|
||||
}
|
||||
|
||||
@@ -518,7 +536,15 @@ public final class SpdyStream {
|
||||
if (closed) return;
|
||||
}
|
||||
if (!sink.finished) {
|
||||
connection.writeData(id, true, null, 0);
|
||||
// Emit the remaining data, setting the END_STREAM flag on the last frame.
|
||||
if (sendBuffer.size() > 0) {
|
||||
while (sendBuffer.size() > 0) {
|
||||
emitDataFrame(true);
|
||||
}
|
||||
} else {
|
||||
// Send an empty frame just so we can set the END_STREAM flag.
|
||||
connection.writeData(id, true, null, 0);
|
||||
}
|
||||
}
|
||||
synchronized (SpdyStream.this) {
|
||||
closed = true;
|
||||
|
||||
Reference in New Issue
Block a user