From e4a4cbae4f2f850ec43fa15bee70fa1380d9354c Mon Sep 17 00:00:00 2001 From: jwilson Date: Sat, 29 Dec 2012 13:42:52 -0700 Subject: [PATCH] Fix synchronization todos in SpdyStream. This code had bugs where ongoing reads could prevent writes from occurring. This update adds some asserts to prevent synchronization errors. --- .../net/spdy/IncomingStreamHandler.java | 6 +- .../internal/net/spdy/SpdyConnection.java | 12 +-- .../okhttp/internal/net/spdy/SpdyStream.java | 84 ++++++++++++------- .../okhttp/internal/net/spdy/SpdyWriter.java | 17 ++-- 4 files changed, 71 insertions(+), 48 deletions(-) diff --git a/src/main/java/com/squareup/okhttp/internal/net/spdy/IncomingStreamHandler.java b/src/main/java/com/squareup/okhttp/internal/net/spdy/IncomingStreamHandler.java index ff8ea0e36..25012d560 100644 --- a/src/main/java/com/squareup/okhttp/internal/net/spdy/IncomingStreamHandler.java +++ b/src/main/java/com/squareup/okhttp/internal/net/spdy/IncomingStreamHandler.java @@ -30,9 +30,9 @@ public interface IncomingStreamHandler { /** * Handle a new stream from this connection's peer. Implementations should - * respond by either {@link SpdyStream#reply(java.util.List) replying to the - * stream} or {@link SpdyStream#close(int) closing it}. This response does - * not need to be synchronous. + * respond by either {@link SpdyStream#reply replying to the stream} or + * {@link SpdyStream#close closing it}. This response does not need to be + * synchronous. */ void receive(SpdyStream stream) throws IOException; } diff --git a/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyConnection.java b/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyConnection.java index 12fa547ba..473d3ce2c 100644 --- a/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyConnection.java +++ b/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyConnection.java @@ -156,9 +156,7 @@ public final class SpdyConnection implements Closeable { } void writeSynReply(int streamId, int flags, List alternating) throws IOException { - synchronized (spdyWriter) { - spdyWriter.synReply(flags, streamId, alternating); - } + spdyWriter.synReply(flags, streamId, alternating); } /** Writes a complete data frame. */ @@ -180,9 +178,7 @@ public final class SpdyConnection implements Closeable { } void writeSynReset(int streamId, int statusCode) throws IOException { - synchronized (spdyWriter) { - spdyWriter.synReset(streamId, statusCode); - } + spdyWriter.synReset(streamId, statusCode); } /** @@ -229,9 +225,7 @@ public final class SpdyConnection implements Closeable { * Sends a noop frame to the peer. */ public void noop() throws IOException { - synchronized (spdyWriter) { - spdyWriter.noop(); - } + spdyWriter.noop(); } public void flush() throws IOException { diff --git a/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyStream.java b/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyStream.java index a0e5f3f1c..eedaeb8f5 100644 --- a/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyStream.java +++ b/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyStream.java @@ -129,6 +129,7 @@ public final class SpdyStream { * to the remote peer. Corresponds to {@code FLAG_FIN}. */ public void reply(List responseHeaders, boolean out) throws IOException { + assert (!Thread.holdsLock(SpdyStream.this)); int flags = 0; synchronized (this) { if (responseHeaders == null) { @@ -175,6 +176,7 @@ public final class SpdyStream { * Abnormally terminate this stream. */ public void close(int rstStatusCode) throws IOException { + assert (!Thread.holdsLock(this)); synchronized (this) { // TODO: no-op if inFinished == true and outFinished == true ? if (this.rstStatusCode != -1) { @@ -197,12 +199,16 @@ public final class SpdyStream { notifyAll(); } - // TODO: locking here broken. Writing threads are blocked by potentially slow reads. - synchronized void receiveData(InputStream in, int flags, int length) throws IOException { + void receiveData(InputStream in, int flags, int length) throws IOException { + assert (!Thread.holdsLock(SpdyStream.this)); this.in.receive(in, length); - if ((flags & SpdyConnection.FLAG_FIN) != 0) { + if ((flags & SpdyConnection.FLAG_FIN) == 0) { + return; + } + + // This is the last incoming data in the stream. + synchronized (this) { this.in.finished = true; - streamStateChanged(); notifyAll(); } } @@ -320,18 +326,28 @@ public final class SpdyStream { } void receive(InputStream in, int byteCount) throws IOException { - if (finished) { - return; // ignore this; probably a benign race - } - if (byteCount == 0) { - return; + assert (!Thread.holdsLock(SpdyStream.this)); + + int pos; + int limit; + int firstNewByte; + synchronized (SpdyStream.this) { + if (finished) { + return; // ignore this; probably a benign race + } + if (byteCount == 0) { + return; + } + if (byteCount > buffer.length - available()) { + throw new IOException(); // TODO: RST the stream + } + pos = this.pos; + firstNewByte = limit = this.limit; } - if (byteCount > buffer.length - available()) { - throw new IOException(); // TODO: RST the stream - } - - // fill [limit..buffer.length) + // Fill the buffer without holding any locks. First fill [limit..buffer.length) if that + // won't overwrite unread data. Then fill [limit..pos). We can't hold a lock, otherwise + // writes will be blocked until reads complete. if (pos < limit) { int firstCopyCount = Math.min(byteCount, buffer.length - limit); Streams.readFully(in, buffer, limit, firstCopyCount); @@ -341,24 +357,26 @@ public final class SpdyStream { limit = 0; } } - - // fill [limit..pos) if (byteCount > 0) { Streams.readFully(in, buffer, limit, byteCount); limit += byteCount; } - if (pos == -1) { - pos = 0; - SpdyStream.this.notifyAll(); + synchronized (SpdyStream.this) { + // Update the new limit, and mark the position as readable if necessary. + this.limit = limit; + if (this.pos == -1) { + this.pos = firstNewByte; + SpdyStream.this.notifyAll(); + } } } @Override public void close() throws IOException { synchronized (SpdyStream.this) { closed = true; - streamStateChanged(); } + cancelStreamIfNecessary(); } private void checkNotClosed() throws IOException { @@ -368,14 +386,20 @@ public final class SpdyStream { } } - private synchronized void streamStateChanged() throws IOException { - // If we closed the input stream before bytes ran out, we want to cancel - // it. But we can only cancel it once the output stream's bytes have all - // been sent, otherwise we'll terminate that innocent bystander. - if (in.closed && !in.finished && (out.finished || out.closed)) { - in.finished = true; - SpdyStream.this.close(RST_CANCEL); + private void cancelStreamIfNecessary() throws IOException { + assert (!Thread.holdsLock(SpdyStream.this)); + synchronized (this) { + if (in.closed && !in.finished && (out.finished || out.closed)) { + // RST this stream to prevent additional data from being sent. This is safe because + // the input stream is closed (we won't use any further bytes) and the output stream + // is either finished or closed (so RSTing both streams won't cause harm). + in.finished = true; + } else { + // We shouldn't cancel this stream. + return; + } } + SpdyStream.this.close(RST_CANCEL); } /** @@ -400,6 +424,7 @@ public final class SpdyStream { } @Override public void write(byte[] bytes, int offset, int count) throws IOException { + assert (!Thread.holdsLock(SpdyStream.this)); checkOffsetAndCount(bytes.length, offset, count); checkNotClosed(); @@ -416,6 +441,7 @@ public final class SpdyStream { } @Override public void flush() throws IOException { + assert (!Thread.holdsLock(SpdyStream.this)); checkNotClosed(); if (pos > DATA_FRAME_HEADER_LENGTH) { writeFrame(false); @@ -424,6 +450,7 @@ public final class SpdyStream { } @Override public void close() throws IOException { + assert (!Thread.holdsLock(SpdyStream.this)); synchronized (SpdyStream.this) { if (closed) { return; @@ -432,10 +459,11 @@ public final class SpdyStream { } writeFrame(true); connection.flush(); - streamStateChanged(); + cancelStreamIfNecessary(); } private void writeFrame(boolean last) throws IOException { + assert (!Thread.holdsLock(SpdyStream.this)); int flags = 0; if (last) { flags |= SpdyConnection.FLAG_FIN; diff --git a/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyWriter.java b/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyWriter.java index 281c3e66c..98b588ff2 100644 --- a/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyWriter.java +++ b/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyWriter.java @@ -42,8 +42,8 @@ final class SpdyWriter { Platform.get().newDeflaterOutputStream(nameValueBlockBuffer, deflater, true)); } - public void synStream(int flags, int streamId, int associatedStreamId, int priority, - List nameValueBlock) throws IOException { + public synchronized void synStream(int flags, int streamId, int associatedStreamId, + int priority, List nameValueBlock) throws IOException { writeNameValueBlockToBuffer(nameValueBlock); int length = 10 + nameValueBlockBuffer.size(); int type = SpdyConnection.TYPE_SYN_STREAM; @@ -58,7 +58,8 @@ final class SpdyWriter { out.flush(); } - public void synReply(int flags, int streamId, List nameValueBlock) throws IOException { + public synchronized void synReply( + int flags, int streamId, List nameValueBlock) throws IOException { writeNameValueBlockToBuffer(nameValueBlock); int type = SpdyConnection.TYPE_SYN_REPLY; int length = nameValueBlockBuffer.size() + 6; @@ -72,7 +73,7 @@ final class SpdyWriter { out.flush(); } - public void synReset(int streamId, int statusCode) throws IOException { + public synchronized void synReset(int streamId, int statusCode) throws IOException { int flags = 0; int type = SpdyConnection.TYPE_RST_STREAM; int length = 8; @@ -83,7 +84,7 @@ final class SpdyWriter { out.flush(); } - public void data(int flags, int streamId, byte[] data) throws IOException { + public synchronized void data(int flags, int streamId, byte[] data) throws IOException { int length = data.length; out.writeInt(streamId & 0x7fffffff); out.writeInt((flags & 0xff) << 24 | length & 0xffffff); @@ -102,7 +103,7 @@ final class SpdyWriter { nameValueBlockOut.flush(); } - public void settings(int flags, Settings settings) throws IOException { + public synchronized void settings(int flags, Settings settings) throws IOException { int type = SpdyConnection.TYPE_SETTINGS; int size = settings.size(); int length = 4 + size * 8; @@ -122,7 +123,7 @@ final class SpdyWriter { out.flush(); } - public void noop() throws IOException { + public synchronized void noop() throws IOException { int type = SpdyConnection.TYPE_NOOP; int length = 0; int flags = 0; @@ -131,7 +132,7 @@ final class SpdyWriter { out.flush(); } - public void ping(int flags, int id) throws IOException { + public synchronized void ping(int flags, int id) throws IOException { int type = SpdyConnection.TYPE_PING; int length = 4; out.writeInt(0x80000000 | (SpdyConnection.VERSION & 0x7fff) << 16 | type & 0xffff);