1
0
mirror of https://github.com/square/okhttp.git synced 2026-01-27 04:22:07 +03:00

Fix synchronization todos in SpdyStream.

This code had bugs where ongoing reads could prevent
writes from occurring. This update adds some asserts
to prevent synchronization errors.
This commit is contained in:
jwilson
2012-12-29 13:42:52 -07:00
parent d10c20a7eb
commit e4a4cbae4f
4 changed files with 71 additions and 48 deletions

View File

@@ -30,9 +30,9 @@ public interface IncomingStreamHandler {
/**
* Handle a new stream from this connection's peer. Implementations should
* respond by either {@link SpdyStream#reply(java.util.List) replying to the
* stream} or {@link SpdyStream#close(int) closing it}. This response does
* not need to be synchronous.
* respond by either {@link SpdyStream#reply replying to the stream} or
* {@link SpdyStream#close closing it}. This response does not need to be
* synchronous.
*/
void receive(SpdyStream stream) throws IOException;
}

View File

@@ -156,9 +156,7 @@ public final class SpdyConnection implements Closeable {
}
void writeSynReply(int streamId, int flags, List<String> alternating) throws IOException {
synchronized (spdyWriter) {
spdyWriter.synReply(flags, streamId, alternating);
}
spdyWriter.synReply(flags, streamId, alternating);
}
/** Writes a complete data frame. */
@@ -180,9 +178,7 @@ public final class SpdyConnection implements Closeable {
}
void writeSynReset(int streamId, int statusCode) throws IOException {
synchronized (spdyWriter) {
spdyWriter.synReset(streamId, statusCode);
}
spdyWriter.synReset(streamId, statusCode);
}
/**
@@ -229,9 +225,7 @@ public final class SpdyConnection implements Closeable {
* Sends a noop frame to the peer.
*/
public void noop() throws IOException {
synchronized (spdyWriter) {
spdyWriter.noop();
}
spdyWriter.noop();
}
public void flush() throws IOException {

View File

@@ -129,6 +129,7 @@ public final class SpdyStream {
* to the remote peer. Corresponds to {@code FLAG_FIN}.
*/
public void reply(List<String> responseHeaders, boolean out) throws IOException {
assert (!Thread.holdsLock(SpdyStream.this));
int flags = 0;
synchronized (this) {
if (responseHeaders == null) {
@@ -175,6 +176,7 @@ public final class SpdyStream {
* Abnormally terminate this stream.
*/
public void close(int rstStatusCode) throws IOException {
assert (!Thread.holdsLock(this));
synchronized (this) {
// TODO: no-op if inFinished == true and outFinished == true ?
if (this.rstStatusCode != -1) {
@@ -197,12 +199,16 @@ public final class SpdyStream {
notifyAll();
}
// TODO: locking here broken. Writing threads are blocked by potentially slow reads.
synchronized void receiveData(InputStream in, int flags, int length) throws IOException {
void receiveData(InputStream in, int flags, int length) throws IOException {
assert (!Thread.holdsLock(SpdyStream.this));
this.in.receive(in, length);
if ((flags & SpdyConnection.FLAG_FIN) != 0) {
if ((flags & SpdyConnection.FLAG_FIN) == 0) {
return;
}
// This is the last incoming data in the stream.
synchronized (this) {
this.in.finished = true;
streamStateChanged();
notifyAll();
}
}
@@ -320,18 +326,28 @@ public final class SpdyStream {
}
void receive(InputStream in, int byteCount) throws IOException {
if (finished) {
return; // ignore this; probably a benign race
}
if (byteCount == 0) {
return;
assert (!Thread.holdsLock(SpdyStream.this));
int pos;
int limit;
int firstNewByte;
synchronized (SpdyStream.this) {
if (finished) {
return; // ignore this; probably a benign race
}
if (byteCount == 0) {
return;
}
if (byteCount > buffer.length - available()) {
throw new IOException(); // TODO: RST the stream
}
pos = this.pos;
firstNewByte = limit = this.limit;
}
if (byteCount > buffer.length - available()) {
throw new IOException(); // TODO: RST the stream
}
// fill [limit..buffer.length)
// Fill the buffer without holding any locks. First fill [limit..buffer.length) if that
// won't overwrite unread data. Then fill [limit..pos). We can't hold a lock, otherwise
// writes will be blocked until reads complete.
if (pos < limit) {
int firstCopyCount = Math.min(byteCount, buffer.length - limit);
Streams.readFully(in, buffer, limit, firstCopyCount);
@@ -341,24 +357,26 @@ public final class SpdyStream {
limit = 0;
}
}
// fill [limit..pos)
if (byteCount > 0) {
Streams.readFully(in, buffer, limit, byteCount);
limit += byteCount;
}
if (pos == -1) {
pos = 0;
SpdyStream.this.notifyAll();
synchronized (SpdyStream.this) {
// Update the new limit, and mark the position as readable if necessary.
this.limit = limit;
if (this.pos == -1) {
this.pos = firstNewByte;
SpdyStream.this.notifyAll();
}
}
}
@Override public void close() throws IOException {
synchronized (SpdyStream.this) {
closed = true;
streamStateChanged();
}
cancelStreamIfNecessary();
}
private void checkNotClosed() throws IOException {
@@ -368,14 +386,20 @@ public final class SpdyStream {
}
}
private synchronized void streamStateChanged() throws IOException {
// If we closed the input stream before bytes ran out, we want to cancel
// it. But we can only cancel it once the output stream's bytes have all
// been sent, otherwise we'll terminate that innocent bystander.
if (in.closed && !in.finished && (out.finished || out.closed)) {
in.finished = true;
SpdyStream.this.close(RST_CANCEL);
private void cancelStreamIfNecessary() throws IOException {
assert (!Thread.holdsLock(SpdyStream.this));
synchronized (this) {
if (in.closed && !in.finished && (out.finished || out.closed)) {
// RST this stream to prevent additional data from being sent. This is safe because
// the input stream is closed (we won't use any further bytes) and the output stream
// is either finished or closed (so RSTing both streams won't cause harm).
in.finished = true;
} else {
// We shouldn't cancel this stream.
return;
}
}
SpdyStream.this.close(RST_CANCEL);
}
/**
@@ -400,6 +424,7 @@ public final class SpdyStream {
}
@Override public void write(byte[] bytes, int offset, int count) throws IOException {
assert (!Thread.holdsLock(SpdyStream.this));
checkOffsetAndCount(bytes.length, offset, count);
checkNotClosed();
@@ -416,6 +441,7 @@ public final class SpdyStream {
}
@Override public void flush() throws IOException {
assert (!Thread.holdsLock(SpdyStream.this));
checkNotClosed();
if (pos > DATA_FRAME_HEADER_LENGTH) {
writeFrame(false);
@@ -424,6 +450,7 @@ public final class SpdyStream {
}
@Override public void close() throws IOException {
assert (!Thread.holdsLock(SpdyStream.this));
synchronized (SpdyStream.this) {
if (closed) {
return;
@@ -432,10 +459,11 @@ public final class SpdyStream {
}
writeFrame(true);
connection.flush();
streamStateChanged();
cancelStreamIfNecessary();
}
private void writeFrame(boolean last) throws IOException {
assert (!Thread.holdsLock(SpdyStream.this));
int flags = 0;
if (last) {
flags |= SpdyConnection.FLAG_FIN;

View File

@@ -42,8 +42,8 @@ final class SpdyWriter {
Platform.get().newDeflaterOutputStream(nameValueBlockBuffer, deflater, true));
}
public void synStream(int flags, int streamId, int associatedStreamId, int priority,
List<String> nameValueBlock) throws IOException {
public synchronized void synStream(int flags, int streamId, int associatedStreamId,
int priority, List<String> nameValueBlock) throws IOException {
writeNameValueBlockToBuffer(nameValueBlock);
int length = 10 + nameValueBlockBuffer.size();
int type = SpdyConnection.TYPE_SYN_STREAM;
@@ -58,7 +58,8 @@ final class SpdyWriter {
out.flush();
}
public void synReply(int flags, int streamId, List<String> nameValueBlock) throws IOException {
public synchronized void synReply(
int flags, int streamId, List<String> nameValueBlock) throws IOException {
writeNameValueBlockToBuffer(nameValueBlock);
int type = SpdyConnection.TYPE_SYN_REPLY;
int length = nameValueBlockBuffer.size() + 6;
@@ -72,7 +73,7 @@ final class SpdyWriter {
out.flush();
}
public void synReset(int streamId, int statusCode) throws IOException {
public synchronized void synReset(int streamId, int statusCode) throws IOException {
int flags = 0;
int type = SpdyConnection.TYPE_RST_STREAM;
int length = 8;
@@ -83,7 +84,7 @@ final class SpdyWriter {
out.flush();
}
public void data(int flags, int streamId, byte[] data) throws IOException {
public synchronized void data(int flags, int streamId, byte[] data) throws IOException {
int length = data.length;
out.writeInt(streamId & 0x7fffffff);
out.writeInt((flags & 0xff) << 24 | length & 0xffffff);
@@ -102,7 +103,7 @@ final class SpdyWriter {
nameValueBlockOut.flush();
}
public void settings(int flags, Settings settings) throws IOException {
public synchronized void settings(int flags, Settings settings) throws IOException {
int type = SpdyConnection.TYPE_SETTINGS;
int size = settings.size();
int length = 4 + size * 8;
@@ -122,7 +123,7 @@ final class SpdyWriter {
out.flush();
}
public void noop() throws IOException {
public synchronized void noop() throws IOException {
int type = SpdyConnection.TYPE_NOOP;
int length = 0;
int flags = 0;
@@ -131,7 +132,7 @@ final class SpdyWriter {
out.flush();
}
public void ping(int flags, int id) throws IOException {
public synchronized void ping(int flags, int id) throws IOException {
int type = SpdyConnection.TYPE_PING;
int length = 4;
out.writeInt(0x80000000 | (SpdyConnection.VERSION & 0x7fff) << 16 | type & 0xffff);