1
0
mirror of https://github.com/square/okhttp.git synced 2026-01-17 08:42:25 +03:00

Update reader state on the reader thread.

This causes the read loop to immedaitely break–the desired behvior after receiving a close frame–without having to wait for the runnable to execute.
This commit is contained in:
Jake Wharton
2015-04-19 16:47:24 -04:00
parent 43c503fa61
commit d18d8ce08a
2 changed files with 40 additions and 21 deletions

View File

@@ -19,7 +19,10 @@ import com.squareup.okhttp.ws.WebSocketRecorder;
import java.io.IOException;
import java.net.ProtocolException;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import okio.Buffer;
import okio.BufferedSink;
import okio.ByteString;
@@ -39,12 +42,14 @@ public final class RealWebSocketTest {
// zero effect on the behavior of the WebSocket API which is why tests are only written once
// from the perspective of a single peer.
private final Executor clientExecutor = Executors.newSingleThreadExecutor();
private RealWebSocket client;
private boolean clientConnectionCloseThrows;
private boolean clientConnectionClosed;
private final Buffer client2Server = new Buffer();
private final WebSocketRecorder clientListener = new WebSocketRecorder();
private final Executor serverExecutor = Executors.newSingleThreadExecutor();
private RealWebSocket server;
private final Buffer server2client = new Buffer();
private final WebSocketRecorder serverListener = new WebSocketRecorder();
@@ -53,13 +58,7 @@ public final class RealWebSocketTest {
Random random = new Random(0);
String url = "http://example.com/websocket";
Executor synchronousExecutor = new Executor() {
@Override public void execute(Runnable command) {
command.run();
}
};
client = new RealWebSocket(true, server2client, client2Server, random, synchronousExecutor,
client = new RealWebSocket(true, server2client, client2Server, random, clientExecutor,
clientListener, url) {
@Override protected void closeConnection() throws IOException {
clientConnectionClosed = true;
@@ -68,7 +67,7 @@ public final class RealWebSocketTest {
}
}
};
server = new RealWebSocket(false, client2Server, server2client, random, synchronousExecutor,
server = new RealWebSocket(false, client2Server, server2client, random, serverExecutor,
serverListener, url) {
@Override protected void closeConnection() throws IOException {
}
@@ -109,6 +108,7 @@ public final class RealWebSocketTest {
sink.close();
server.readMessage();
serverListener.assertTextMessage("Hello!");
waitForExecutor(serverExecutor); // Pong write happens asynchronously.
client.readMessage();
clientListener.assertPong(new Buffer().writeUtf8("Pong?"));
}
@@ -116,6 +116,7 @@ public final class RealWebSocketTest {
@Test public void pingWritesPong() throws IOException, InterruptedException {
client.sendPing(new Buffer().writeUtf8("Hello!"));
server.readMessage(); // Read the ping, write the pong.
waitForExecutor(serverExecutor); // Pong write happens asynchronously.
client.readMessage(); // Read the pong.
clientListener.assertPong(new Buffer().writeUtf8("Hello!"));
}
@@ -128,9 +129,9 @@ public final class RealWebSocketTest {
@Test public void close() throws IOException {
client.close(1000, "Hello!");
server.readMessage(); // This will trigger a close response.
assertFalse(server.readMessage()); // This will trigger a close response.
serverListener.assertClose(1000, "Hello!");
client.readMessage();
assertFalse(client.readMessage());
clientListener.assertClose(1000, "Hello!");
}
@@ -224,7 +225,8 @@ public final class RealWebSocketTest {
server.readMessage(); // Read client close, send server close.
serverListener.assertClose(1000, "Hello!");
client.readMessage();
client.readMessage(); // Read server close.
waitForExecutor(clientExecutor); // Close happens asynchronously.
assertTrue(clientConnectionClosed);
clientListener.assertClose(1000, "Hello!");
}
@@ -247,6 +249,7 @@ public final class RealWebSocketTest {
assertFalse(clientConnectionClosed);
client.readMessage(); // Read close, should NOT send close.
waitForExecutor(clientExecutor); // Close happens asynchronously.
assertTrue(clientConnectionClosed);
clientListener.assertClose(1000, "Hello!");
@@ -300,4 +303,20 @@ public final class RealWebSocketTest {
server.readMessage();
serverListener.assertClose(1000, "Bye!");
}
private static void waitForExecutor(Executor executor) {
final CountDownLatch latch = new CountDownLatch(1);
executor.execute(new Runnable() {
@Override public void run() {
latch.countDown();
}
});
try {
if (!latch.await(10, TimeUnit.SECONDS)) {
throw new IllegalStateException("Timed out waiting for executor.");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

View File

@@ -69,9 +69,17 @@ public abstract class RealWebSocket implements WebSocket {
}
@Override public void onClose(final int code, final String reason) {
final boolean writeCloseResponse;
synchronized (closeLock) {
readerSentClose = true;
// If the writer has not indicated a desire to close we will write a close response.
writeCloseResponse = !writerSentClose;
}
replyExecutor.execute(new NamedRunnable("OkHttp %s WebSocket Close Reply", url) {
@Override protected void execute() {
peerClose(code, reason);
peerClose(code, reason, writeCloseResponse);
}
});
}
@@ -132,15 +140,7 @@ public abstract class RealWebSocket implements WebSocket {
}
/** Replies and closes this web socket when a close frame is read from the peer. */
private void peerClose(int code, String reason) {
boolean writeCloseResponse;
synchronized (closeLock) {
readerSentClose = true;
// If the writer has not indicated a desire to close we will write a close response.
writeCloseResponse = !writerSentClose;
}
private void peerClose(int code, String reason, boolean writeCloseResponse) {
if (writeCloseResponse) {
try {
writer.writeClose(code, reason);