From d18d8ce08a273bb73d8fe2bdbc99dfa95226ca54 Mon Sep 17 00:00:00 2001 From: Jake Wharton Date: Sun, 19 Apr 2015 16:47:24 -0400 Subject: [PATCH] Update reader state on the reader thread. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This causes the read loop to immedaitely break–the desired behvior after receiving a close frame–without having to wait for the runnable to execute. --- .../okhttp/internal/ws/RealWebSocketTest.java | 41 ++++++++++++++----- .../okhttp/internal/ws/RealWebSocket.java | 20 ++++----- 2 files changed, 40 insertions(+), 21 deletions(-) diff --git a/okhttp-ws-tests/src/test/java/com/squareup/okhttp/internal/ws/RealWebSocketTest.java b/okhttp-ws-tests/src/test/java/com/squareup/okhttp/internal/ws/RealWebSocketTest.java index 857f00c2f..241376d07 100644 --- a/okhttp-ws-tests/src/test/java/com/squareup/okhttp/internal/ws/RealWebSocketTest.java +++ b/okhttp-ws-tests/src/test/java/com/squareup/okhttp/internal/ws/RealWebSocketTest.java @@ -19,7 +19,10 @@ import com.squareup.okhttp.ws.WebSocketRecorder; import java.io.IOException; import java.net.ProtocolException; import java.util.Random; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import okio.Buffer; import okio.BufferedSink; import okio.ByteString; @@ -39,12 +42,14 @@ public final class RealWebSocketTest { // zero effect on the behavior of the WebSocket API which is why tests are only written once // from the perspective of a single peer. + private final Executor clientExecutor = Executors.newSingleThreadExecutor(); private RealWebSocket client; private boolean clientConnectionCloseThrows; private boolean clientConnectionClosed; private final Buffer client2Server = new Buffer(); private final WebSocketRecorder clientListener = new WebSocketRecorder(); + private final Executor serverExecutor = Executors.newSingleThreadExecutor(); private RealWebSocket server; private final Buffer server2client = new Buffer(); private final WebSocketRecorder serverListener = new WebSocketRecorder(); @@ -53,13 +58,7 @@ public final class RealWebSocketTest { Random random = new Random(0); String url = "http://example.com/websocket"; - Executor synchronousExecutor = new Executor() { - @Override public void execute(Runnable command) { - command.run(); - } - }; - - client = new RealWebSocket(true, server2client, client2Server, random, synchronousExecutor, + client = new RealWebSocket(true, server2client, client2Server, random, clientExecutor, clientListener, url) { @Override protected void closeConnection() throws IOException { clientConnectionClosed = true; @@ -68,7 +67,7 @@ public final class RealWebSocketTest { } } }; - server = new RealWebSocket(false, client2Server, server2client, random, synchronousExecutor, + server = new RealWebSocket(false, client2Server, server2client, random, serverExecutor, serverListener, url) { @Override protected void closeConnection() throws IOException { } @@ -109,6 +108,7 @@ public final class RealWebSocketTest { sink.close(); server.readMessage(); serverListener.assertTextMessage("Hello!"); + waitForExecutor(serverExecutor); // Pong write happens asynchronously. client.readMessage(); clientListener.assertPong(new Buffer().writeUtf8("Pong?")); } @@ -116,6 +116,7 @@ public final class RealWebSocketTest { @Test public void pingWritesPong() throws IOException, InterruptedException { client.sendPing(new Buffer().writeUtf8("Hello!")); server.readMessage(); // Read the ping, write the pong. + waitForExecutor(serverExecutor); // Pong write happens asynchronously. client.readMessage(); // Read the pong. clientListener.assertPong(new Buffer().writeUtf8("Hello!")); } @@ -128,9 +129,9 @@ public final class RealWebSocketTest { @Test public void close() throws IOException { client.close(1000, "Hello!"); - server.readMessage(); // This will trigger a close response. + assertFalse(server.readMessage()); // This will trigger a close response. serverListener.assertClose(1000, "Hello!"); - client.readMessage(); + assertFalse(client.readMessage()); clientListener.assertClose(1000, "Hello!"); } @@ -224,7 +225,8 @@ public final class RealWebSocketTest { server.readMessage(); // Read client close, send server close. serverListener.assertClose(1000, "Hello!"); - client.readMessage(); + client.readMessage(); // Read server close. + waitForExecutor(clientExecutor); // Close happens asynchronously. assertTrue(clientConnectionClosed); clientListener.assertClose(1000, "Hello!"); } @@ -247,6 +249,7 @@ public final class RealWebSocketTest { assertFalse(clientConnectionClosed); client.readMessage(); // Read close, should NOT send close. + waitForExecutor(clientExecutor); // Close happens asynchronously. assertTrue(clientConnectionClosed); clientListener.assertClose(1000, "Hello!"); @@ -300,4 +303,20 @@ public final class RealWebSocketTest { server.readMessage(); serverListener.assertClose(1000, "Bye!"); } + + private static void waitForExecutor(Executor executor) { + final CountDownLatch latch = new CountDownLatch(1); + executor.execute(new Runnable() { + @Override public void run() { + latch.countDown(); + } + }); + try { + if (!latch.await(10, TimeUnit.SECONDS)) { + throw new IllegalStateException("Timed out waiting for executor."); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } } diff --git a/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/RealWebSocket.java b/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/RealWebSocket.java index a647ac704..07d763ba4 100644 --- a/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/RealWebSocket.java +++ b/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/RealWebSocket.java @@ -69,9 +69,17 @@ public abstract class RealWebSocket implements WebSocket { } @Override public void onClose(final int code, final String reason) { + final boolean writeCloseResponse; + synchronized (closeLock) { + readerSentClose = true; + + // If the writer has not indicated a desire to close we will write a close response. + writeCloseResponse = !writerSentClose; + } + replyExecutor.execute(new NamedRunnable("OkHttp %s WebSocket Close Reply", url) { @Override protected void execute() { - peerClose(code, reason); + peerClose(code, reason, writeCloseResponse); } }); } @@ -132,15 +140,7 @@ public abstract class RealWebSocket implements WebSocket { } /** Replies and closes this web socket when a close frame is read from the peer. */ - private void peerClose(int code, String reason) { - boolean writeCloseResponse; - synchronized (closeLock) { - readerSentClose = true; - - // If the writer has not indicated a desire to close we will write a close response. - writeCloseResponse = !writerSentClose; - } - + private void peerClose(int code, String reason, boolean writeCloseResponse) { if (writeCloseResponse) { try { writer.writeClose(code, reason);